From e2d19cf0966c3833025b9c580f2e980e726d1f88 Mon Sep 17 00:00:00 2001 From: Anish Date: Thu, 21 May 2026 21:11:12 -0500 Subject: [PATCH] AIP-76: Add example and docs for runtime asset partitioning --- .../docs/authoring-and-scheduling/assets.rst | 42 +++++++++++++++ .../example_dags/example_asset_partition.py | 53 +++++++++++++++++++ 2 files changed, 95 insertions(+) diff --git a/airflow-core/docs/authoring-and-scheduling/assets.rst b/airflow-core/docs/authoring-and-scheduling/assets.rst index a983544900c5c..9fbb232cbb306 100644 --- a/airflow-core/docs/authoring-and-scheduling/assets.rst +++ b/airflow-core/docs/authoring-and-scheduling/assets.rst @@ -596,5 +596,47 @@ including ``partition_key`` in the request body): "partition_key": "us|2026-03-10T09:00:00" }' +Setting partition keys at runtime +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +When the partition key is not known ahead of time (for example, a watermark +discovered from the source data, a late-arriving file, or a backfill request), +let the producing task decide it while it runs. Schedule the producer with +``PartitionAtRuntime()`` and record the key(s) on the emitted event with +``outlet_events[self].add_partitions(...)``: + +.. code-block:: python + + from airflow.sdk import PartitionAtRuntime, asset + + + @asset( + uri="file://incoming/player-stats/live-region.csv", + schedule=PartitionAtRuntime(), + ) + def live_region_player_stats(self, outlet_events): + # The key is only known once the task runs. + outlet_events[self].add_partitions("us") + +Inside an ``@asset`` function, ``self`` (the emitted ``Asset``) and +``outlet_events`` (the outlet event accessor) are reserved parameter names that +Airflow populates at runtime. Pass a single key, or a list to fan out to several +partitions in one run. Each key produces its own asset event, and duplicate +keys collapse to a single event: + +.. code-block:: python + + @asset( + uri="file://incoming/player-stats/multi-region.csv", + schedule=PartitionAtRuntime(), + ) + def multi_region_player_stats(self, outlet_events): + outlet_events[self].add_partitions(["us", "eu", "apac"]) + +When a runtime run emits exactly one partition key, the producing +``dag_run.partition_key`` is back-filled to that key. Downstream Dags consume +these events the same way as timetable-produced partitions, through +``PartitionedAssetTimetable``. + For complete runnable examples, see ``airflow-core/src/airflow/example_dags/example_asset_partition.py``. diff --git a/airflow-core/src/airflow/example_dags/example_asset_partition.py b/airflow-core/src/airflow/example_dags/example_asset_partition.py index 75d582f6cad6e..f9c8258f951db 100644 --- a/airflow-core/src/airflow/example_dags/example_asset_partition.py +++ b/airflow-core/src/airflow/example_dags/example_asset_partition.py @@ -25,6 +25,7 @@ Asset, CronPartitionTimetable, IdentityMapper, + PartitionAtRuntime, PartitionedAssetTimetable, ProductMapper, StartOfDayMapper, @@ -225,3 +226,55 @@ def regional_stats_breakdown(): keys belong to a fixed set of allowed values (``us``, ``eu``, ``apac``) rather than time-based partitions. """ pass + + +@asset( + uri="file://incoming/player-stats/live-region.csv", + schedule=PartitionAtRuntime(), + tags=["player-stats", "runtime"], +) +def live_region_player_stats(self, outlet_events): + """ + Produce a single region partition whose key is decided at runtime. + + This asset demonstrates PartitionAtRuntime, which records the partition key on the + emitted event with ``add_partitions`` while the task runs rather than from a timetable. + """ + outlet_events[self].add_partitions("us") + + +with DAG( + dag_id="summarize_live_region_stats", + schedule=PartitionedAssetTimetable(assets=Asset.ref(name="live_region_player_stats")), + catchup=False, + tags=["player-stats", "runtime"], +): + """ + Summarize the live region statistics for each runtime-emitted partition. + + Triggered once per partition key recorded upstream at runtime. + """ + + @task + def summarize_live_region(dag_run=None): + """Summarize stats for the matched runtime partition.""" + if TYPE_CHECKING: + assert dag_run + print(dag_run.partition_key) + + summarize_live_region() + + +@asset( + uri="file://incoming/player-stats/multi-region.csv", + schedule=PartitionAtRuntime(), + tags=["player-stats", "runtime"], +) +def multi_region_player_stats(self, outlet_events): + """ + Produce several region partitions from a single run. + + This asset demonstrates runtime fan-out, where each key emits its own asset event + and duplicate keys collapse to a single event. + """ + outlet_events[self].add_partitions(["us", "eu", "apac"])