From 33651b6125476e9fc91a1536970094ca60746687 Mon Sep 17 00:00:00 2001 From: vincbeck Date: Wed, 13 May 2026 18:02:18 -0400 Subject: [PATCH] Add `allow_global` option to asset access control --- airflow-core/docs/migrations-ref.rst | 4 +- airflow-core/src/airflow/assets/manager.py | 10 +++- .../src/airflow/dag_processing/collection.py | 12 +++- .../example_dags/example_asset_allow_teams.py | 8 ++- ...olumns_to_dag_schedule_asset_reference.py} | 10 +++- airflow-core/src/airflow/models/asset.py | 3 + .../tests/unit/assets/test_manager.py | 57 ++++++++++++++++++- .../unit/dag_processing/test_collection.py | 10 ++-- 8 files changed, 95 insertions(+), 19 deletions(-) rename airflow-core/src/airflow/migrations/versions/{0114_3_3_0_add_allow_producer_teams_to_dag_schedule_asset_reference.py => 0114_3_3_0_add_access_control_columns_to_dag_schedule_asset_reference.py} (79%) diff --git a/airflow-core/docs/migrations-ref.rst b/airflow-core/docs/migrations-ref.rst index 9d80414b64bb8..42056f2c6cbd8 100644 --- a/airflow-core/docs/migrations-ref.rst +++ b/airflow-core/docs/migrations-ref.rst @@ -41,8 +41,8 @@ Here's the list of all the Database Migrations that are executed via when you ru +=========================+==================+===================+==============================================================+ | ``a1b2c3d4e5f6`` (head) | ``a7f3b2c1d4e5`` | ``3.3.0`` | Add version_data to dag_version. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ -| ``a7f3b2c1d4e5`` | ``b8f3e4a1d2c9`` | ``3.3.0`` | Add allow_producer_teams column to | -| | | | dag_schedule_asset_reference table. | +| ``a7f3b2c1d4e5`` | ``b8f3e4a1d2c9`` | ``3.3.0`` | Add access control columns to dag_schedule_asset_reference | +| | | | table. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``b8f3e4a1d2c9`` | ``fde9ed84d07b`` | ``3.3.0`` | Add retry_delay_override and retry_reason to task_instance. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ diff --git a/airflow-core/src/airflow/assets/manager.py b/airflow-core/src/airflow/assets/manager.py index 9f333efe8510b..7c12c31f979d6 100644 --- a/airflow-core/src/airflow/assets/manager.py +++ b/airflow-core/src/airflow/assets/manager.py @@ -204,10 +204,13 @@ def _filter_dags_by_team( dag_ids = [dag.dag_id for dag in dags_to_queue] dag_id_to_team = DagModel.get_dag_id_to_team_name_mapping(dag_ids, session=session) - # Build per-consumer allow_producer_teams from the schedule reference rows. + # Build per-consumer allow_producer_teams and allow_global_producers from the schedule reference rows. dag_id_to_allow_teams: dict[str, list[str]] = { ref.dag_id: ref.allow_producer_teams or [] for ref in asset_model.scheduled_dags } + dag_id_to_allow_global: dict[str, bool] = { + ref.dag_id: ref.allow_global_producers for ref in asset_model.scheduled_dags + } filtered = set() for dag in dags_to_queue: @@ -222,8 +225,9 @@ def _filter_dags_by_team( if source_is_api: # Teamless API user can only trigger teamless consumers continue - # Teamless DAG producer is global — triggers all consumers - filtered.add(dag) + # Teamless DAG producer — check allow_global_producers + if dag_id_to_allow_global.get(dag.dag_id, True): + filtered.add(dag) continue if consumer_team in source_teams: diff --git a/airflow-core/src/airflow/dag_processing/collection.py b/airflow-core/src/airflow/dag_processing/collection.py index 94fb4f84530a5..785763bf9d8d6 100644 --- a/airflow-core/src/airflow/dag_processing/collection.py +++ b/airflow-core/src/airflow/dag_processing/collection.py @@ -896,23 +896,29 @@ def add_dag_asset_references( dags[dag_id].schedule_asset_references = [] continue referenced_assets = { - assets[r.name, r.uri]: r.access_control.get("producer_teams", []) for r in references + assets[r.name, r.uri]: ( + r.access_control.get("producer_teams", []), + r.access_control.get("allow_global", True), + ) + for r in references } referenced_asset_ids = {a.id for a in referenced_assets} orm_refs = {r.asset_id: r for r in dags[dag_id].schedule_asset_references} for asset_id, ref in orm_refs.items(): if asset_id not in referenced_asset_ids: session.delete(ref) - for asset_model, teams in referenced_assets.items(): + for asset_model, (teams, allow_global) in referenced_assets.items(): if asset_model.id in orm_refs: orm_refs[asset_model.id].allow_producer_teams = teams + orm_refs[asset_model.id].allow_global_producers = allow_global session.bulk_save_objects( DagScheduleAssetReference( asset_id=asset_model.id, dag_id=dag_id, allow_producer_teams=teams, + allow_global_producers=allow_global, ) - for asset_model, teams in referenced_assets.items() + for asset_model, (teams, allow_global) in referenced_assets.items() if asset_model.id not in orm_refs ) diff --git a/airflow-core/src/airflow/example_dags/example_asset_allow_teams.py b/airflow-core/src/airflow/example_dags/example_asset_allow_teams.py index f547057ce7364..d77eeb9d2b7ed 100644 --- a/airflow-core/src/airflow/example_dags/example_asset_allow_teams.py +++ b/airflow-core/src/airflow/example_dags/example_asset_allow_teams.py @@ -23,8 +23,9 @@ Usage: - ``team_analytics_producer`` (belonging to ``team_analytics``) produces events on ``shared_data``. - ``team_ml_consumer`` (belonging to ``team_ml``) consumes ``shared_data``. - - Because ``shared_data`` has ``access_control=AssetAccessControl(producer_teams=["team_analytics"])``, - events from ``team_analytics`` are accepted by ``team_ml_consumer``. + - Because ``shared_data`` has ``access_control=AssetAccessControl(producer_teams=["team_analytics"], + allow_global=False)``, events from ``team_analytics`` are accepted by ``team_ml_consumer``, while + teamless (global) DAG producers are blocked. - Without ``access_control``, the cross-team event would be blocked. """ @@ -36,12 +37,13 @@ from airflow.sdk import DAG, Asset, AssetAccessControl # [START asset_access_control] -# Define an asset that accepts events from team_analytics. +# Define an asset that accepts events from team_analytics but blocks global (teamless) producers. shared_data = Asset( name="shared_data", uri="s3://data-lake/shared/output.csv", access_control=AssetAccessControl( producer_teams=["team_analytics"], + allow_global=False, ), ) diff --git a/airflow-core/src/airflow/migrations/versions/0114_3_3_0_add_allow_producer_teams_to_dag_schedule_asset_reference.py b/airflow-core/src/airflow/migrations/versions/0114_3_3_0_add_access_control_columns_to_dag_schedule_asset_reference.py similarity index 79% rename from airflow-core/src/airflow/migrations/versions/0114_3_3_0_add_allow_producer_teams_to_dag_schedule_asset_reference.py rename to airflow-core/src/airflow/migrations/versions/0114_3_3_0_add_access_control_columns_to_dag_schedule_asset_reference.py index d354607a84fce..6030be4162362 100644 --- a/airflow-core/src/airflow/migrations/versions/0114_3_3_0_add_allow_producer_teams_to_dag_schedule_asset_reference.py +++ b/airflow-core/src/airflow/migrations/versions/0114_3_3_0_add_access_control_columns_to_dag_schedule_asset_reference.py @@ -17,7 +17,7 @@ # under the License. """ -Add allow_producer_teams column to dag_schedule_asset_reference table. +Add access control columns to dag_schedule_asset_reference table. Revision ID: a7f3b2c1d4e5 Revises: b8f3e4a1d2c9 @@ -38,15 +38,19 @@ def upgrade(): - """Add allow_producer_teams column to dag_schedule_asset_reference.""" + """Add access control columns to dag_schedule_asset_reference.""" with op.batch_alter_table("dag_schedule_asset_reference", schema=None) as batch_op: batch_op.add_column(sa.Column("allow_producer_teams", sa.JSON(), nullable=True)) + batch_op.add_column( + sa.Column("allow_global_producers", sa.Boolean(), nullable=False, server_default=sa.true()) + ) def downgrade(): - """Remove allow_producer_teams column from dag_schedule_asset_reference.""" + """Remove access control columns from dag_schedule_asset_reference.""" from airflow.migrations.utils import disable_sqlite_fkeys with disable_sqlite_fkeys(op): with op.batch_alter_table("dag_schedule_asset_reference", schema=None) as batch_op: + batch_op.drop_column("allow_global_producers") batch_op.drop_column("allow_producer_teams") diff --git a/airflow-core/src/airflow/models/asset.py b/airflow-core/src/airflow/models/asset.py index 3e8390fdfaf6f..d07fc876b1798 100644 --- a/airflow-core/src/airflow/models/asset.py +++ b/airflow-core/src/airflow/models/asset.py @@ -592,6 +592,9 @@ class DagScheduleAssetReference(Base): asset_id: Mapped[int] = mapped_column(Integer, primary_key=True, nullable=False) dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True, nullable=False) allow_producer_teams: Mapped[list | None] = mapped_column(sa.JSON(), nullable=True) + allow_global_producers: Mapped[bool] = mapped_column( + sa.Boolean(), nullable=False, server_default=sa.true() + ) created_at: Mapped[datetime] = mapped_column(UtcDateTime, default=timezone.utcnow, nullable=False) updated_at: Mapped[datetime] = mapped_column( UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False diff --git a/airflow-core/tests/unit/assets/test_manager.py b/airflow-core/tests/unit/assets/test_manager.py index 1f3eead3dc2eb..0585fac803a8f 100644 --- a/airflow-core/tests/unit/assets/test_manager.py +++ b/airflow-core/tests/unit/assets/test_manager.py @@ -328,14 +328,21 @@ def _make_dag(dag_id: str) -> DagModel: def _make_asset_model( scheduled_dags: dict[str, list[str]] | None = None, + allow_global: dict[str, bool] | None = None, ) -> AssetModel: """Create a mock AssetModel. :param scheduled_dags: mapping of dag_id -> allow_producer_teams for each consumer reference. + :param allow_global: mapping of dag_id -> allow_global_producers for each consumer reference. """ + allow_global = allow_global or {} model = mock.Mock(spec=AssetModel) model.scheduled_dags = [ - mock.Mock(dag_id=dag_id, allow_producer_teams=teams) + mock.Mock( + dag_id=dag_id, + allow_producer_teams=teams, + allow_global_producers=allow_global.get(dag_id, True), + ) for dag_id, teams in (scheduled_dags or {}).items() ] return model @@ -534,3 +541,51 @@ def test_both_teamless_allowed(self): ) assert dag in result + + @conf_vars({("core", "multi_team"): "true"}) + def test_teamless_dag_producer_blocked_when_allow_global_false(self): + """Teamless DAG producer is blocked when consumer's allow_global_producers=False.""" + dag = _make_dag("dag1") + + with mock.patch.object(DagModel, "get_dag_id_to_team_name_mapping", return_value={"dag1": "team_b"}): + result = AssetManager._filter_dags_by_team( + dags_to_queue={dag}, + source_teams=set(), + asset_model=_make_asset_model(scheduled_dags={"dag1": []}, allow_global={"dag1": False}), + source_is_api=False, + session=mock.Mock(), + ) + + assert dag not in result + + @conf_vars({("core", "multi_team"): "true"}) + def test_teamless_dag_producer_allowed_when_allow_global_true(self): + """Teamless DAG producer allowed when consumer's allow_global_producers=True (default).""" + dag = _make_dag("dag1") + + with mock.patch.object(DagModel, "get_dag_id_to_team_name_mapping", return_value={"dag1": "team_b"}): + result = AssetManager._filter_dags_by_team( + dags_to_queue={dag}, + source_teams=set(), + asset_model=_make_asset_model(scheduled_dags={"dag1": []}, allow_global={"dag1": True}), + source_is_api=False, + session=mock.Mock(), + ) + + assert dag in result + + @conf_vars({("core", "multi_team"): "true"}) + def test_teamless_api_user_not_affected_by_allow_global(self): + """Teamless API user behavior unchanged by allow_global — still blocked from team-bound consumers.""" + dag_with_team = _make_dag("dag1") + + with mock.patch.object(DagModel, "get_dag_id_to_team_name_mapping", return_value={"dag1": "team_b"}): + result = AssetManager._filter_dags_by_team( + dags_to_queue={dag_with_team}, + source_teams=set(), + asset_model=_make_asset_model(scheduled_dags={"dag1": []}, allow_global={"dag1": True}), + source_is_api=True, + session=mock.Mock(), + ) + + assert dag_with_team not in result diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py b/airflow-core/tests/unit/dag_processing/test_collection.py index eb6efc761527b..8a536dbb12a2d 100644 --- a/airflow-core/tests/unit/dag_processing/test_collection.py +++ b/airflow-core/tests/unit/dag_processing/test_collection.py @@ -144,16 +144,16 @@ def per_test(self) -> Generator: self.clean_db() @pytest.mark.usefixtures("testing_dag_bundle") - def test_sync_assets_preserves_allow_producer_teams_from_other_bundle(self, dag_maker, session): + def test_sync_assets_preserves_access_control_from_other_bundle(self, dag_maker, session): """When a producer bundle (without access_control) is synced after a consumer bundle - (with access_control), the stored allow_producer_teams must not be wiped out.""" + (with access_control), the stored access control fields must not be wiped out.""" from airflow.models.asset import DagScheduleAssetReference from airflow.sdk import AssetAccessControl # First sync: consumer bundle sets access_control on the asset. consumer_asset = Asset( "shared_asset", - access_control=AssetAccessControl(producer_teams=["team1", "team2"]), + access_control=AssetAccessControl(producer_teams=["team1", "team2"], allow_global=False), ) with dag_maker(dag_id="consumer_dag", schedule=[consumer_asset]) as consumer_dag: EmptyOperator(task_id="mytask") @@ -170,6 +170,7 @@ def test_sync_assets_preserves_allow_producer_teams_from_other_bundle(self, dag_ select(DagScheduleAssetReference).where(DagScheduleAssetReference.dag_id == "consumer_dag") ) assert ref.allow_producer_teams == ["team1", "team2"] + assert ref.allow_global_producers is False # Second sync: producer bundle references the same asset WITHOUT access_control. producer_asset = Asset("shared_asset") @@ -182,9 +183,10 @@ def test_sync_assets_preserves_allow_producer_teams_from_other_bundle(self, dag_ asset_op.sync_assets(session=session) session.flush() - # Consumer's allow_producer_teams must still be preserved. + # Consumer's access control must still be preserved. session.expire(ref) assert ref.allow_producer_teams == ["team1", "team2"] + assert ref.allow_global_producers is False @pytest.mark.parametrize( ("is_active", "is_paused", "expected_num_triggers"),