Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions airflow-core/docs/authoring-and-scheduling/assets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -596,5 +596,47 @@ including ``partition_key`` in the request body):
"partition_key": "us|2026-03-10T09:00:00"
}'

Setting partition keys at runtime
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

When the partition key is not known ahead of time (for example, a watermark
discovered from the source data, a late-arriving file, or a backfill request),
let the producing task decide it while it runs. Schedule the producer with
``PartitionAtRuntime()`` and record the key(s) on the emitted event with
``outlet_events[self].add_partitions(...)``:

.. code-block:: python

from airflow.sdk import PartitionAtRuntime, asset


@asset(
uri="file://incoming/player-stats/live-region.csv",
schedule=PartitionAtRuntime(),
)
def live_region_player_stats(self, outlet_events):
# The key is only known once the task runs.
outlet_events[self].add_partitions("us")

Inside an ``@asset`` function, ``self`` (the emitted ``Asset``) and
``outlet_events`` (the outlet event accessor) are reserved parameter names that
Airflow populates at runtime. Pass a single key, or a list to fan out to several
partitions in one run. Each key produces its own asset event, and duplicate
keys collapse to a single event:

.. code-block:: python

@asset(
uri="file://incoming/player-stats/multi-region.csv",
schedule=PartitionAtRuntime(),
)
def multi_region_player_stats(self, outlet_events):
outlet_events[self].add_partitions(["us", "eu", "apac"])

When a runtime run emits exactly one partition key, the producing
``dag_run.partition_key`` is back-filled to that key. Downstream Dags consume
these events the same way as timetable-produced partitions, through
``PartitionedAssetTimetable``.

For complete runnable examples, see
``airflow-core/src/airflow/example_dags/example_asset_partition.py``.
53 changes: 53 additions & 0 deletions airflow-core/src/airflow/example_dags/example_asset_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
Asset,
CronPartitionTimetable,
IdentityMapper,
PartitionAtRuntime,
PartitionedAssetTimetable,
ProductMapper,
StartOfDayMapper,
Expand Down Expand Up @@ -225,3 +226,55 @@ def regional_stats_breakdown():
keys belong to a fixed set of allowed values (``us``, ``eu``, ``apac``) rather than time-based partitions.
"""
pass


@asset(
uri="file://incoming/player-stats/live-region.csv",
schedule=PartitionAtRuntime(),
tags=["player-stats", "runtime"],
)
def live_region_player_stats(self, outlet_events):
"""
Produce a single region partition whose key is decided at runtime.

This asset demonstrates PartitionAtRuntime, which records the partition key on the
emitted event with ``add_partitions`` while the task runs rather than from a timetable.
"""
outlet_events[self].add_partitions("us")


with DAG(
dag_id="summarize_live_region_stats",
schedule=PartitionedAssetTimetable(assets=Asset.ref(name="live_region_player_stats")),
catchup=False,
tags=["player-stats", "runtime"],
):
"""
Summarize the live region statistics for each runtime-emitted partition.

Triggered once per partition key recorded upstream at runtime.
"""

@task
def summarize_live_region(dag_run=None):
"""Summarize stats for the matched runtime partition."""
if TYPE_CHECKING:
assert dag_run
print(dag_run.partition_key)

summarize_live_region()


@asset(
uri="file://incoming/player-stats/multi-region.csv",
schedule=PartitionAtRuntime(),
tags=["player-stats", "runtime"],
)
def multi_region_player_stats(self, outlet_events):
"""
Produce several region partitions from a single run.

This asset demonstrates runtime fan-out, where each key emits its own asset event
and duplicate keys collapse to a single event.
"""
outlet_events[self].add_partitions(["us", "eu", "apac"])
Loading