From 05f7780295b1ec6ae3728a0dcedd8820090d2882 Mon Sep 17 00:00:00 2001 From: Jake Roach <116606359+jroachgolf84@users.noreply.github.com> Date: Fri, 8 May 2026 08:11:10 -0400 Subject: [PATCH 1/9] feature/aip-93-scoping: Passing Asset name/uri through to BaseEventTrigger --- .../airflow/executors/workloads/trigger.py | 4 ++++ .../src/airflow/jobs/triggerer_job_runner.py | 21 +++++++++++++++++++ airflow-core/src/airflow/triggers/base.py | 7 +++++++ 3 files changed, 32 insertions(+) diff --git a/airflow-core/src/airflow/executors/workloads/trigger.py b/airflow-core/src/airflow/executors/workloads/trigger.py index edde48f7f73f9..2a6c1ea2a855e 100644 --- a/airflow-core/src/airflow/executors/workloads/trigger.py +++ b/airflow-core/src/airflow/executors/workloads/trigger.py @@ -46,3 +46,7 @@ class RunTrigger(BaseModel): dag_run_data: dict | None = ( None # Serialized DagRun data in dict format so it can be deserialized in trigger subprocess. ) + + # aip-93 + watched_asset_name: str | None = None # Set for BaseEventTrigger asset watchers only. + watched_asset_uri: str | None = None # Set for BaseEventTrigger asset watchers only. diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index 660bba03c7236..31b32a5ad085a 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -760,11 +760,23 @@ def _create_workload( render_log_fname: Callable[..., str], session: Session, ) -> workloads.RunTrigger | None: + # aip-93: Why are we doing this? if trigger.task_instance is None: + asset_name: str | None = None + asset_uri: str | None = None + + # aip-93: Is it always going to be the first asset from the asset_watchers list? + if trigger.asset_watchers: + first_asset = trigger.asset_watchers[0].asset + asset_name = first_asset.name + asset_uri = first_asset.uri + return workloads.RunTrigger( id=trigger.id, classpath=trigger.classpath, encrypted_kwargs=trigger.encrypted_kwargs, + watched_asset_name=asset_name, + watched_asset_uri=asset_uri, ) if not trigger.task_instance.dag_version_id: @@ -1267,6 +1279,15 @@ async def create_triggers(self): trigger_instance.triggerer_job_id = self.job_id trigger_instance.timeout_after = workload.timeout_after + # aip-93 + if isinstance(trigger_instance, BaseEventTrigger) and workload.watched_asset_uri: + from airflow.sdk.definitions.asset import AssetUniqueKey + + trigger_instance.watched_asset = AssetUniqueKey( + name=workload.watched_asset_name, + uri=workload.watched_asset_uri, + ) + self.triggers[trigger_id] = { "task": asyncio.create_task( self.run_trigger(trigger_id, trigger_instance, workload.timeout_after, context), diff --git a/airflow-core/src/airflow/triggers/base.py b/airflow-core/src/airflow/triggers/base.py index f39b62facf7b2..bfa79a60d52cb 100644 --- a/airflow-core/src/airflow/triggers/base.py +++ b/airflow-core/src/airflow/triggers/base.py @@ -45,6 +45,7 @@ from airflow.models.mappedoperator import MappedOperator from airflow.models.taskinstance import TaskInstance + from airflow.sdk.definitions.asset import AssetUniqueKey from airflow.sdk.definitions.context import Context from airflow.serialization.serialized_objects import SerializedBaseOperator @@ -255,6 +256,12 @@ class BaseEventTrigger(BaseTrigger): supports_triggerer_queue: bool = False + def __init__(self, **kwargs): + super().__init__(**kwargs) + + # Injected by the triggerer before run() is called; mirrors how trigger_id is set + self.watched_asset: AssetUniqueKey | None = None + @staticmethod def hash(classpath: str, kwargs: dict[str, Any]) -> int: """ From 8788b3d11d01f3a13e7a6ecd46ab41f5726885a7 Mon Sep 17 00:00:00 2001 From: Jake Roach <116606359+jroachgolf84@users.noreply.github.com> Date: Fri, 8 May 2026 09:04:53 -0400 Subject: [PATCH 2/9] feature/aip-93-scoping: Removing comment --- airflow-core/src/airflow/jobs/triggerer_job_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index 31b32a5ad085a..0f08258629b61 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -760,7 +760,7 @@ def _create_workload( render_log_fname: Callable[..., str], session: Session, ) -> workloads.RunTrigger | None: - # aip-93: Why are we doing this? + # aip-93 if trigger.task_instance is None: asset_name: str | None = None asset_uri: str | None = None From e4c1db245950def036ee9066a7290cb44f8e916a Mon Sep 17 00:00:00 2001 From: Jake Roach <116606359+jroachgolf84@users.noreply.github.com> Date: Sun, 17 May 2026 13:08:16 -0400 Subject: [PATCH 3/9] feature/aip-93-scoping: Moving from single to multi-Asset --- .../airflow/executors/workloads/trigger.py | 5 ++-- .../src/airflow/jobs/triggerer_job_runner.py | 23 ++++++++----------- airflow-core/src/airflow/triggers/base.py | 2 +- 3 files changed, 12 insertions(+), 18 deletions(-) diff --git a/airflow-core/src/airflow/executors/workloads/trigger.py b/airflow-core/src/airflow/executors/workloads/trigger.py index 2a6c1ea2a855e..85c3d78cf506b 100644 --- a/airflow-core/src/airflow/executors/workloads/trigger.py +++ b/airflow-core/src/airflow/executors/workloads/trigger.py @@ -47,6 +47,5 @@ class RunTrigger(BaseModel): None # Serialized DagRun data in dict format so it can be deserialized in trigger subprocess. ) - # aip-93 - watched_asset_name: str | None = None # Set for BaseEventTrigger asset watchers only. - watched_asset_uri: str | None = None # Set for BaseEventTrigger asset watchers only. + # aip-93: name: uri of all "watched" Assets + watched_assets: dict[str, str] | None = None # Set for BaseEventTrigger asset watchers only diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index f39d9c9bd438a..cf94090765c50 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -774,21 +774,17 @@ def _create_workload( ) -> workloads.RunTrigger | None: # aip-93 if trigger.task_instance is None: - asset_name: str | None = None - asset_uri: str | None = None + watched_assets: dict[str, str] | None = None - # aip-93: Is it always going to be the first asset from the asset_watchers list? + # aip-93 if trigger.asset_watchers: - first_asset = trigger.asset_watchers[0].asset - asset_name = first_asset.name - asset_uri = first_asset.uri + watched_assets = {a.name: a.uri for a in trigger.assets} return workloads.RunTrigger( id=trigger.id, classpath=trigger.classpath, encrypted_kwargs=trigger.encrypted_kwargs, - watched_asset_name=asset_name, - watched_asset_uri=asset_uri, + watched_assets=watched_assets, ) if not trigger.task_instance.dag_version_id: @@ -1291,14 +1287,13 @@ async def create_triggers(self): trigger_instance.triggerer_job_id = self.job_id trigger_instance.timeout_after = workload.timeout_after - # aip-93 - if isinstance(trigger_instance, BaseEventTrigger) and workload.watched_asset_uri: + # aip-93: Pass through all watched assets from workload + if isinstance(trigger_instance, BaseEventTrigger) and workload.watched_assets: from airflow.sdk.definitions.asset import AssetUniqueKey - trigger_instance.watched_asset = AssetUniqueKey( - name=workload.watched_asset_name, - uri=workload.watched_asset_uri, - ) + trigger_instance.watched_assets = [ + AssetUniqueKey(name=name, uri=uri) for name, uri in workload.watched_assets.items() + ] self.triggers[trigger_id] = { "task": asyncio.create_task( diff --git a/airflow-core/src/airflow/triggers/base.py b/airflow-core/src/airflow/triggers/base.py index bfa79a60d52cb..9134269d52002 100644 --- a/airflow-core/src/airflow/triggers/base.py +++ b/airflow-core/src/airflow/triggers/base.py @@ -260,7 +260,7 @@ def __init__(self, **kwargs): super().__init__(**kwargs) # Injected by the triggerer before run() is called; mirrors how trigger_id is set - self.watched_asset: AssetUniqueKey | None = None + self.watched_assets: list[AssetUniqueKey] | None = None @staticmethod def hash(classpath: str, kwargs: dict[str, Any]) -> int: From ce953efbecb3513a1cce9f5cc2c35cff966465a6 Mon Sep 17 00:00:00 2001 From: Jake Roach <116606359+jroachgolf84@users.noreply.github.com> Date: Wed, 27 May 2026 08:01:41 -0400 Subject: [PATCH 4/9] feature/aip-93-scoping: Adding AssetStateAccessor --- airflow-core/src/airflow/jobs/triggerer_job_runner.py | 8 ++++++++ airflow-core/src/airflow/triggers/base.py | 2 ++ 2 files changed, 10 insertions(+) diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index 4faf3dd2d8392..7e602aaaa40eb 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -1270,6 +1270,14 @@ async def create_triggers(self): AssetUniqueKey(name=name, uri=uri) for name, uri in workload.watched_assets.items() ] + # aip-103: Reconstruct AssetStateAccessors from watched_assets + from airflow.sdk.definitions.asset import Asset + from airflow.sdk.execution_time.context import AssetStateAccessors + + trigger_instance.asset_states = AssetStateAccessors( + inlets=[Asset(name=name, uri=uri) for name, uri in workload.watched_assets.items()] + ) + self.triggers[trigger_id] = { "task": asyncio.create_task( self.run_trigger(trigger_id, trigger_instance, workload.timeout_after, context), diff --git a/airflow-core/src/airflow/triggers/base.py b/airflow-core/src/airflow/triggers/base.py index f6e45c13e2a69..b612c0556e3da 100644 --- a/airflow-core/src/airflow/triggers/base.py +++ b/airflow-core/src/airflow/triggers/base.py @@ -33,6 +33,7 @@ ) from airflow.sdk.definitions._internal.templater import Templater +from airflow.sdk.execution_time.context import AssetStateAccessors from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import TaskInstanceState @@ -303,6 +304,7 @@ def __init__(self, **kwargs): # Injected by the triggerer before run() is called; mirrors how trigger_id is set self.watched_assets: list[AssetUniqueKey] | None = None + self.asset_states: AssetStateAccessors | None = None @staticmethod def hash(classpath: str, kwargs: dict[str, Any]) -> int: From 7be572d89d7f36ab2be8ba0f048989f643873a62 Mon Sep 17 00:00:00 2001 From: Jake Roach <116606359+jroachgolf84@users.noreply.github.com> Date: Wed, 27 May 2026 09:50:47 -0400 Subject: [PATCH 5/9] feature/aip-93-scoping: Adding comment --- airflow-core/src/airflow/jobs/triggerer_job_runner.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index 7e602aaaa40eb..87850ff5dbbe6 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -1274,6 +1274,7 @@ async def create_triggers(self): from airflow.sdk.definitions.asset import Asset from airflow.sdk.execution_time.context import AssetStateAccessors + # Potentially address Asset vs. AssetRef, AssetUriRef, etc. trigger_instance.asset_states = AssetStateAccessors( inlets=[Asset(name=name, uri=uri) for name, uri in workload.watched_assets.items()] ) From 819795f884b1e0c8bd0466af571606707410152a Mon Sep 17 00:00:00 2001 From: Jake Roach <116606359+jroachgolf84@users.noreply.github.com> Date: Wed, 27 May 2026 14:35:00 -0400 Subject: [PATCH 6/9] feature/aip-93-scoping: Adding comment to address potential change --- airflow-core/src/airflow/jobs/triggerer_job_runner.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index 87850ff5dbbe6..d9bf95ff27feb 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -1266,6 +1266,7 @@ async def create_triggers(self): if isinstance(trigger_instance, BaseEventTrigger) and workload.watched_assets: from airflow.sdk.definitions.asset import AssetUniqueKey + # If we only want asset_states, we can just remove this line! trigger_instance.watched_assets = [ AssetUniqueKey(name=name, uri=uri) for name, uri in workload.watched_assets.items() ] From 788acb3ea74a00164acdf79470ec08701554935b Mon Sep 17 00:00:00 2001 From: Jake Roach <116606359+jroachgolf84@users.noreply.github.com> Date: Thu, 28 May 2026 08:46:01 -0400 Subject: [PATCH 7/9] feature/aip-93-scoping: Updating asset_states -> asset_state to match context['asset_state'] --- airflow-core/src/airflow/jobs/triggerer_job_runner.py | 4 ++-- airflow-core/src/airflow/triggers/base.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index f5513118685d8..f6238999eca5a 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -1274,7 +1274,7 @@ async def create_triggers(self): if isinstance(trigger_instance, BaseEventTrigger) and workload.watched_assets: from airflow.sdk.definitions.asset import AssetUniqueKey - # If we only want asset_states, we can just remove this line! + # If we only want asset_state, we can just remove this line! trigger_instance.watched_assets = [ AssetUniqueKey(name=name, uri=uri) for name, uri in workload.watched_assets.items() ] @@ -1284,7 +1284,7 @@ async def create_triggers(self): from airflow.sdk.execution_time.context import AssetStateAccessors # Potentially address Asset vs. AssetRef, AssetUriRef, etc. - trigger_instance.asset_states = AssetStateAccessors( + trigger_instance.asset_state = AssetStateAccessors( inlets=[Asset(name=name, uri=uri) for name, uri in workload.watched_assets.items()] ) diff --git a/airflow-core/src/airflow/triggers/base.py b/airflow-core/src/airflow/triggers/base.py index b612c0556e3da..7553fe9a110fb 100644 --- a/airflow-core/src/airflow/triggers/base.py +++ b/airflow-core/src/airflow/triggers/base.py @@ -304,7 +304,7 @@ def __init__(self, **kwargs): # Injected by the triggerer before run() is called; mirrors how trigger_id is set self.watched_assets: list[AssetUniqueKey] | None = None - self.asset_states: AssetStateAccessors | None = None + self.asset_state: AssetStateAccessors | None = None @staticmethod def hash(classpath: str, kwargs: dict[str, Any]) -> int: From b7cf568ea5dced9e73b9b9011610c6d499b833cc Mon Sep 17 00:00:00 2001 From: Jake Roach <116606359+jroachgolf84@users.noreply.github.com> Date: Sat, 30 May 2026 08:15:26 -0400 Subject: [PATCH 8/9] feature/aip-93-scoping: Only passing asset_state through to BaseEventTrigger --- .../src/airflow/executors/workloads/trigger.py | 2 +- .../src/airflow/jobs/triggerer_job_runner.py | 13 ++----------- airflow-core/src/airflow/triggers/base.py | 2 -- 3 files changed, 3 insertions(+), 14 deletions(-) diff --git a/airflow-core/src/airflow/executors/workloads/trigger.py b/airflow-core/src/airflow/executors/workloads/trigger.py index 85c3d78cf506b..d3b2d0627a7ec 100644 --- a/airflow-core/src/airflow/executors/workloads/trigger.py +++ b/airflow-core/src/airflow/executors/workloads/trigger.py @@ -47,5 +47,5 @@ class RunTrigger(BaseModel): None # Serialized DagRun data in dict format so it can be deserialized in trigger subprocess. ) - # aip-93: name: uri of all "watched" Assets + # name: uri of all "watched" Assets watched_assets: dict[str, str] | None = None # Set for BaseEventTrigger asset watchers only diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index f6238999eca5a..72e6cfb3dd946 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -737,11 +737,10 @@ def _create_workload( render_log_fname: Callable[..., str], session: Session, ) -> workloads.RunTrigger | None: - # aip-93 + # Pass the "watched" Assets through for downstream use in BaseEventTrigger if trigger.task_instance is None: watched_assets: dict[str, str] | None = None - # aip-93 if trigger.asset_watchers: watched_assets = {a.name: a.uri for a in trigger.assets} @@ -1270,16 +1269,8 @@ async def create_triggers(self): trigger_instance.triggerer_job_id = self.job_id trigger_instance.timeout_after = workload.timeout_after - # aip-93: Pass through all watched assets from workload if isinstance(trigger_instance, BaseEventTrigger) and workload.watched_assets: - from airflow.sdk.definitions.asset import AssetUniqueKey - - # If we only want asset_state, we can just remove this line! - trigger_instance.watched_assets = [ - AssetUniqueKey(name=name, uri=uri) for name, uri in workload.watched_assets.items() - ] - - # aip-103: Reconstruct AssetStateAccessors from watched_assets + # Reconstruct AssetStateAccessors from watched_assets from airflow.sdk.definitions.asset import Asset from airflow.sdk.execution_time.context import AssetStateAccessors diff --git a/airflow-core/src/airflow/triggers/base.py b/airflow-core/src/airflow/triggers/base.py index 7553fe9a110fb..124232e7d4097 100644 --- a/airflow-core/src/airflow/triggers/base.py +++ b/airflow-core/src/airflow/triggers/base.py @@ -46,7 +46,6 @@ from airflow.models.mappedoperator import MappedOperator from airflow.models.taskinstance import TaskInstance - from airflow.sdk.definitions.asset import AssetUniqueKey from airflow.sdk.definitions.context import Context from airflow.serialization.serialized_objects import SerializedBaseOperator @@ -303,7 +302,6 @@ def __init__(self, **kwargs): super().__init__(**kwargs) # Injected by the triggerer before run() is called; mirrors how trigger_id is set - self.watched_assets: list[AssetUniqueKey] | None = None self.asset_state: AssetStateAccessors | None = None @staticmethod From ec3590c20c28c2c29bb56d0cf088dd49d4c3ee51 Mon Sep 17 00:00:00 2001 From: Jake Roach <116606359+jroachgolf84@users.noreply.github.com> Date: Tue, 2 Jun 2026 15:30:02 -0400 Subject: [PATCH 9/9] feature/expore-asset-state-accessor: Implementing feedback --- airflow-core/src/airflow/jobs/triggerer_job_runner.py | 6 +++--- airflow-core/src/airflow/triggers/base.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index f959fe3d3d17b..58b1b1741f69e 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -745,7 +745,7 @@ def _create_workload( if trigger.task_instance is None: watched_assets: dict[str, str] | None = None - if trigger.asset_watchers: + if trigger.assets: watched_assets = {a.name: a.uri for a in trigger.assets} return workloads.RunTrigger( @@ -1276,10 +1276,10 @@ async def create_triggers(self): if isinstance(trigger_instance, BaseEventTrigger) and workload.watched_assets: # Reconstruct AssetStateAccessors from watched_assets from airflow.sdk.definitions.asset import Asset - from airflow.sdk.execution_time.context import AssetStateAccessors + from airflow.sdk.execution_time.context import AssetStoreAccessors # Potentially address Asset vs. AssetRef, AssetUriRef, etc. - trigger_instance.asset_state = AssetStateAccessors( + trigger_instance.asset_store = AssetStoreAccessors( inlets=[Asset(name=name, uri=uri) for name, uri in workload.watched_assets.items()] ) diff --git a/airflow-core/src/airflow/triggers/base.py b/airflow-core/src/airflow/triggers/base.py index 124232e7d4097..0a47f6247d374 100644 --- a/airflow-core/src/airflow/triggers/base.py +++ b/airflow-core/src/airflow/triggers/base.py @@ -33,7 +33,6 @@ ) from airflow.sdk.definitions._internal.templater import Templater -from airflow.sdk.execution_time.context import AssetStateAccessors from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import TaskInstanceState @@ -47,6 +46,7 @@ from airflow.models.mappedoperator import MappedOperator from airflow.models.taskinstance import TaskInstance from airflow.sdk.definitions.context import Context + from airflow.sdk.execution_time.context import AssetStoreAccessors from airflow.serialization.serialized_objects import SerializedBaseOperator Operator: TypeAlias = MappedOperator | SerializedBaseOperator @@ -302,7 +302,7 @@ def __init__(self, **kwargs): super().__init__(**kwargs) # Injected by the triggerer before run() is called; mirrors how trigger_id is set - self.asset_state: AssetStateAccessors | None = None + self.asset_store: AssetStoreAccessors | None = None @staticmethod def hash(classpath: str, kwargs: dict[str, Any]) -> int: