Skip to content
24 changes: 22 additions & 2 deletions airflow-core/src/airflow/serialization/stringify.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,17 @@
# under the License.
from __future__ import annotations

import re
from typing import Any, TypeVar

T = TypeVar("T", bool, float, int, dict, list, str, tuple, set)

# DagBag prefixes user-DAG modules with ``unusual_prefix_<40-char-sha>_`` so two
# DAG files with the same name in different bundles don't clash in ``sys.modules``.
# That prefix is deterministic and load-bearing for round-trip deserialization,
# but it has no place in the human-readable XCom value rendering.
_DAGBAG_PREFIX_RE = re.compile(r"unusual_prefix_[a-f0-9]{40}_")


class StringifyNotSupportedError(ValueError):
"""
Expand Down Expand Up @@ -128,14 +135,27 @@ def stringify(o: T | None) -> object:
return result

# only return string representation
s = f"{classname}@version={version}("
display_classname = _DAGBAG_PREFIX_RE.sub("", classname)
s = f"{display_classname}@version={version}("
if isinstance(value, _primitives):
s += f"{value}"
elif isinstance(value, _builtin_collections):
# deserialized values can be != str
s += ",".join(str(stringify(v)) for v in value)
elif isinstance(value, dict):
s += ",".join(f"{k}={stringify(v)}" for k, v in value.items())
# Render string field values with ``repr`` so the output reads like a
# Pydantic/dataclass instance (``field='value'``) instead of an
# ambiguous ``field=value`` that could be mistaken for a bare token.
# Non-string field values keep their natural rendering (numbers stay
# bare, nested serialized objects keep their own ``ClassName@...`` form).
parts = []
for k, v in value.items():
rendered = stringify(v)
if isinstance(v, str):
parts.append(f"{k}={v!r}")
else:
parts.append(f"{k}={rendered}")
s += ", ".join(parts)
s += ")"

return s
26 changes: 25 additions & 1 deletion airflow-core/tests/unit/serialization/test_stringify.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,30 @@ def test_stringify(self):
s = stringify(e)
assert "t=(1, 2)" in s

def test_stringify_quotes_string_fields(self):
"""String field values are repr-quoted so they read like a Pydantic/dataclass instance."""
e = {
CLASSNAME: "mymod.MyClass",
VERSION: 1,
"__data__": {"name": "alice", "age": 30, "active": True},
}
s = stringify(e)
assert "name='alice'" in s
assert "age=30" in s
assert "active=True" in s

def test_stringify_strips_dagbag_module_prefix(self):
"""DagBag's ``unusual_prefix_<sha>_`` is stripped from the displayed classname."""
e = {
CLASSNAME: "unusual_prefix_" + "a" * 40 + "_my_dag.MyModel",
VERSION: 1,
"__data__": {"field": "value"},
}
s = stringify(e)
assert "unusual_prefix_" not in s
assert "my_dag.MyModel@version=1" in s
assert "field='value'" in s

@pytest.mark.parametrize(
("value", "expected"),
[
Expand Down Expand Up @@ -194,7 +218,7 @@ def test_stringify_custom_object(self):
}
result = stringify(e)
assert "deltalake.table.DeltaTable@version=1" in result
assert "table_uri=s3://bucket/path" in result
assert "table_uri='s3://bucket/path'" in result
assert "version=0" in result

def test_stringify_empty_classname_error(self):
Expand Down
23 changes: 23 additions & 0 deletions providers/common/ai/docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,29 @@
Changelog
---------

Breaking change: operators with ``output_type=<BaseModel subclass>``
(``LLMOperator``, ``LLMAgentOperator``, ``LLMFileAnalysisOperator``, and
their ``@task.llm`` / ``@task.agent`` / ``@task.llm_file_analysis`` decorators)
now return the Pydantic model instance through XCom instead of dumping it to
a ``dict`` when the running Airflow version provides
``airflow.sdk.serde.allow_class``. Downstream tasks should type-hint the model
class (``def downstream(result: MyModel)``) and use attribute access
(``result.field``) instead of subscript access. The output class must be
defined at **module scope** and bound to an attribute matching its
``__name__``; operators raise ``ValueError`` at construction time when
``output_type`` (or any ``BaseModel`` reachable from a ``Union``/``Optional``/
``list`` of types) is nested, dynamically built, or non-importable by ``qualname``.

Same-DAG downstream tasks deserialize the model without any configuration
change because each worker re-runs the operator constructor when it parses the
DAG. The UI XCom viewer renders the value via the ``stringify`` path and works
without configuration (it shows ``module.MyModel@version=1(field=value,...)``
rather than a pretty form, but no allow-list edit is required). Cross-DAG
``xcom_pull`` consumers still need the class qualified name added to
``[core] allowed_deserialization_classes`` -- the consumer DAG's worker only
parses its own DAG file. On older Airflow releases that lack ``allow_class``
the operators continue to dump to ``dict``.

0.3.0
.....

Expand Down
20 changes: 18 additions & 2 deletions providers/common/ai/docs/operators/agent.rst
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,24 @@ to the model. This mirrors the input types accepted by pydantic-ai's
Structured Output
-----------------

Set ``output_type`` to a Pydantic ``BaseModel`` subclass to get structured
data back. The result is serialized via ``model_dump()`` for XCom.
Set ``output_type`` to a Pydantic ``BaseModel`` subclass to get structured data
back. The model instance is pushed to XCom unchanged so downstream tasks can
type-hint the class directly (``def downstream(result: MyModel)``) and use
attribute access (``result.field``).

The operator auto-registers ``output_type`` (and any ``BaseModel`` reachable
from ``Union``/``Optional``/``list`` shapes) for XCom deserialization in every
process that parses the DAG. The Pydantic class must be defined at **module
scope** and bound to an attribute matching its ``__name__``. Same-DAG
downstream tasks need no configuration. The UI's XCom viewer renders the value
via the ``stringify`` path (no configuration needed; see the ``LLMOperator``
guide for the exact representation). Cross-DAG ``xcom_pull`` consumers still
need the class ``qualname`` added to ``[core] allowed_deserialization_classes``.

.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_agent.py
:language: python
:start-after: [START howto_decorator_agent_structured_output_class]
:end-before: [END howto_decorator_agent_structured_output_class]

.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_agent.py
:language: python
Expand Down
39 changes: 37 additions & 2 deletions providers/common/ai/docs/operators/llm.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,49 @@ Structured Output
-----------------

Set ``output_type`` to a Pydantic ``BaseModel`` subclass. The LLM is instructed
to return structured data, and the result is serialized via ``model_dump()``
for XCom:
to return structured data, and the model instance is pushed to XCom unchanged
so downstream tasks can type-hint the class directly
(``def downstream(result: MyModel)``) and use attribute access (``result.field``).

The operator auto-registers ``output_type`` (and any ``BaseModel`` reachable from
``Union``/``Optional``/``list`` shapes) for XCom deserialization in every
process that parses the DAG. The Pydantic class must be defined at **module
scope** and bound to an attribute matching its ``__name__`` -- classes nested
inside a function or ``@dag``-decorated body, parameterized generics, and
dynamically-built classes whose ``__name__`` does not match the attribute they
are bound to are rejected at construction time with a ``ValueError``.

.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py
:language: python
:start-after: [START howto_operator_llm_structured_output_class]
:end-before: [END howto_operator_llm_structured_output_class]

.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llm.py
:language: python
:start-after: [START howto_operator_llm_structured]
:end-before: [END howto_operator_llm_structured]

Auto-registration covers downstream tasks in the **same DAG** -- their workers
parse the DAG file when starting up, which re-runs the operator constructor and
re-populates the per-process allow-list.

The Airflow UI's XCom viewer renders Pydantic instances via the
``stringify`` path, which produces a representation like
``my_module.MyModel@version=1(field=value,...)`` without consulting the
allow-list. It is not pretty (no field-by-field rendering today), but the value
shows up; no configuration is required.

The remaining gap is **cross-DAG** ``xcom_pull`` -- a task in a different DAG
that pulls this XCom only parses its own DAG file, not the producer's, so the
class is not auto-registered. Add the class qualified name to
``[core] allowed_deserialization_classes`` (or a glob that matches it) to make
that pattern work.

If a downstream consumer needs the dict shape (e.g. forwarding to an external
system that expects JSON-style payloads), pass ``serialize_output=True`` and the
operator calls ``model_dump()`` before pushing to XCom. The pre-PR behavior is
available on demand without giving up the typed default.

Agent Parameters
----------------

Expand Down
18 changes: 17 additions & 1 deletion providers/common/ai/docs/operators/llm_file_analysis.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,23 @@ Structured Output
-----------------

Set ``output_type`` to a Pydantic ``BaseModel`` when you want a typed response
back from the LLM instead of a plain string:
back from the LLM instead of a plain string. The model instance is pushed to
XCom unchanged so downstream tasks can type-hint the class directly. The
operator auto-registers ``output_type`` (and any ``BaseModel`` reachable from
``Union``/``Optional``/``list`` shapes) for deserialization in every process
that parses the DAG. Define the class at **module scope** and bind it to an
attribute matching its ``__name__``: nested-in-function classes and
dynamically-built classes are rejected at construction time. Same-DAG
downstream tasks need no configuration; the UI XCom viewer renders the value
via the ``stringify`` path (no configuration needed). Cross-DAG ``xcom_pull``
consumers still need the class ``qualname`` added to
``[core] allowed_deserialization_classes`` (see the ``LLMOperator`` guide for
details).

.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llm_file_analysis.py
:language: python
:start-after: [START howto_operator_llm_file_analysis_structured_output_class]
:end-before: [END howto_operator_llm_file_analysis_structured_output_class]

.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_llm_file_analysis.py
:language: python
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
TaskFlow decorator for general-purpose LLM calls.
The user writes a function that **returns the prompt string**. The decorator
handles hook creation, agent configuration, LLM call, and output serialization.
When ``output_type`` is a Pydantic ``BaseModel``, the result is serialized via
``model_dump()`` for XCom.
handles hook creation, agent configuration, and the LLM call. When
``output_type`` is a Pydantic ``BaseModel`` subclass, the model instance is
returned to XCom unchanged so downstream tasks can type-hint it directly.
The class must be defined at module scope.
"""

from __future__ import annotations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,28 @@

from datetime import timedelta

from pydantic import BaseModel

from airflow.providers.common.ai.operators.agent import AgentOperator
from airflow.providers.common.ai.toolsets.hook import HookToolset
from airflow.providers.common.ai.toolsets.sql import SQLToolset
from airflow.providers.common.compat.sdk import dag, task


# [START howto_decorator_agent_structured_output_class]
# Pydantic output classes must be defined at module scope so downstream
# tasks can re-import them when deserializing the XCom payload.
class Analysis(BaseModel):
"""Structured analysis output for the agent example."""

summary: str
top_items: list[str]
row_count: int


# [END howto_decorator_agent_structured_output_class]


# ---------------------------------------------------------------------------
# 1. SQL Agent: answer a question using database tools
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -125,13 +142,6 @@ def analyze(question: str):
# [START howto_decorator_agent_structured]
@dag(tags=["example"])
def example_agent_structured_output():
from pydantic import BaseModel

class Analysis(BaseModel):
summary: str
top_items: list[str]
row_count: int

@task.agent(
llm_conn_id="pydanticai_default",
system_prompt="You are a data analyst. Return structured results.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@
from airflow.providers.common.compat.sdk import dag, task


# [START howto_operator_llm_structured_output_class]
# Pydantic output classes must be defined at module scope so they survive
# XCom serialization (their qualname is used to re-import them downstream).
class Entities(BaseModel):
"""Named entities extracted from a text."""

names: list[str]
locations: list[str]


# [END howto_operator_llm_structured_output_class]


# [START howto_operator_llm_basic]
@dag(tags=["example"])
def example_llm_operator():
Expand All @@ -46,10 +59,6 @@ def example_llm_operator():
# [START howto_operator_llm_structured]
@dag(tags=["example"])
def example_llm_operator_structured():
class Entities(BaseModel):
names: list[str]
locations: list[str]

LLMOperator(
task_id="extract_entities",
prompt="Extract all named entities from the article.",
Expand Down Expand Up @@ -99,10 +108,6 @@ def summarize(text: str):
# [START howto_decorator_llm_structured]
@dag(tags=["example"])
def example_llm_decorator_structured():
class Entities(BaseModel):
names: list[str]
locations: list[str]

@task.llm(
llm_conn_id="pydanticai_default",
system_prompt="Extract named entities.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,20 @@
from airflow.providers.common.compat.sdk import dag, task


# Pydantic output classes must be defined at module scope so they can be
# imported by name when downstream tasks deserialize the XCom payload.
class TicketAnalysis(BaseModel):
"""Structured analysis of a single support ticket."""

priority: str
category: str
summary: str
suggested_action: str


# [START howto_decorator_llm_pipeline]
@dag(tags=["example"])
def example_llm_analysis_pipeline():
class TicketAnalysis(BaseModel):
priority: str
category: str
summary: str
suggested_action: str

@task
def get_support_tickets():
"""Fetch unprocessed support tickets."""
Expand Down Expand Up @@ -66,10 +71,10 @@ def analyze_ticket(ticket: str):
return f"Analyze this support ticket:\n\n{ticket}"

@task
def store_results(analyses: list[dict]):
def store_results(analyses: list[TicketAnalysis]):
"""Store ticket analyses. In production, this would write to a database or ticketing system."""
for analysis in analyses:
print(f"[{analysis['priority'].upper()}] {analysis['category']}: {analysis['summary']}")
print(f"[{analysis.priority.upper()}] {analysis.category}: {analysis.summary}")

tickets = get_support_tickets()
analyses = analyze_ticket.expand(ticket=tickets)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@
from airflow.providers.common.compat.sdk import dag, task


# [START howto_operator_llm_file_analysis_structured_output_class]
# Pydantic output classes must be defined at module scope so they can be
# imported by name when downstream tasks deserialize the XCom payload.
class FileAnalysisSummary(BaseModel):
"""Structured output schema for the file-analysis examples."""

findings: list[str]
highest_severity: str
truncated_inputs: bool


# [END howto_operator_llm_file_analysis_structured_output_class]


# [START howto_operator_llm_file_analysis_basic]
@dag(tags=["example"])
def example_llm_file_analysis_basic():
Expand Down Expand Up @@ -85,14 +99,6 @@ def example_llm_file_analysis_multimodal():
# [START howto_operator_llm_file_analysis_structured]
@dag(tags=["example"])
def example_llm_file_analysis_structured():

class FileAnalysisSummary(BaseModel):
"""Structured output schema for the file-analysis examples."""

findings: list[str]
highest_severity: str
truncated_inputs: bool

LLMFileAnalysisOperator(
task_id="analyze_parquet_quality",
prompt=(
Expand Down
Loading
Loading