Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions airflow-core/docs/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ Here's the list of all the Database Migrations that are executed via when you ru
+=========================+==================+===================+==============================================================+
| ``a1b2c3d4e5f6`` (head) | ``a7f3b2c1d4e5`` | ``3.3.0`` | Add version_data to dag_version. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``a7f3b2c1d4e5`` | ``b8f3e4a1d2c9`` | ``3.3.0`` | Add allow_producer_teams column to |
| | | | dag_schedule_asset_reference table. |
| ``a7f3b2c1d4e5`` | ``b8f3e4a1d2c9`` | ``3.3.0`` | Add access control columns to dag_schedule_asset_reference |
| | | | table. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``b8f3e4a1d2c9`` | ``fde9ed84d07b`` | ``3.3.0`` | Add retry_delay_override and retry_reason to task_instance. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
Expand Down
10 changes: 7 additions & 3 deletions airflow-core/src/airflow/assets/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,13 @@ def _filter_dags_by_team(
dag_ids = [dag.dag_id for dag in dags_to_queue]
dag_id_to_team = DagModel.get_dag_id_to_team_name_mapping(dag_ids, session=session)

# Build per-consumer allow_producer_teams from the schedule reference rows.
# Build per-consumer allow_producer_teams and allow_global_producers from the schedule reference rows.
dag_id_to_allow_teams: dict[str, list[str]] = {
ref.dag_id: ref.allow_producer_teams or [] for ref in asset_model.scheduled_dags
}
dag_id_to_allow_global: dict[str, bool] = {
ref.dag_id: ref.allow_global_producers for ref in asset_model.scheduled_dags
}

filtered = set()
for dag in dags_to_queue:
Expand All @@ -222,8 +225,9 @@ def _filter_dags_by_team(
if source_is_api:
# Teamless API user can only trigger teamless consumers
continue
# Teamless DAG producer is global — triggers all consumers
filtered.add(dag)
# Teamless DAG producer — check allow_global_producers
if dag_id_to_allow_global.get(dag.dag_id, True):
filtered.add(dag)
continue

if consumer_team in source_teams:
Expand Down
12 changes: 9 additions & 3 deletions airflow-core/src/airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -896,23 +896,29 @@ def add_dag_asset_references(
dags[dag_id].schedule_asset_references = []
continue
referenced_assets = {
assets[r.name, r.uri]: r.access_control.get("producer_teams", []) for r in references
assets[r.name, r.uri]: (
r.access_control.get("producer_teams", []),
r.access_control.get("allow_global", True),
)
for r in references
}
referenced_asset_ids = {a.id for a in referenced_assets}
orm_refs = {r.asset_id: r for r in dags[dag_id].schedule_asset_references}
for asset_id, ref in orm_refs.items():
if asset_id not in referenced_asset_ids:
session.delete(ref)
for asset_model, teams in referenced_assets.items():
for asset_model, (teams, allow_global) in referenced_assets.items():
if asset_model.id in orm_refs:
orm_refs[asset_model.id].allow_producer_teams = teams
orm_refs[asset_model.id].allow_global_producers = allow_global
session.bulk_save_objects(
DagScheduleAssetReference(
asset_id=asset_model.id,
dag_id=dag_id,
allow_producer_teams=teams,
allow_global_producers=allow_global,
)
for asset_model, teams in referenced_assets.items()
for asset_model, (teams, allow_global) in referenced_assets.items()
if asset_model.id not in orm_refs
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
Usage:
- ``team_analytics_producer`` (belonging to ``team_analytics``) produces events on ``shared_data``.
- ``team_ml_consumer`` (belonging to ``team_ml``) consumes ``shared_data``.
- Because ``shared_data`` has ``access_control=AssetAccessControl(producer_teams=["team_analytics"])``,
events from ``team_analytics`` are accepted by ``team_ml_consumer``.
- Because ``shared_data`` has ``access_control=AssetAccessControl(producer_teams=["team_analytics"],
allow_global=False)``, events from ``team_analytics`` are accepted by ``team_ml_consumer``, while
teamless (global) DAG producers are blocked.
- Without ``access_control``, the cross-team event would be blocked.
"""

Expand All @@ -36,12 +37,13 @@
from airflow.sdk import DAG, Asset, AssetAccessControl

# [START asset_access_control]
# Define an asset that accepts events from team_analytics.
# Define an asset that accepts events from team_analytics but blocks global (teamless) producers.
shared_data = Asset(
name="shared_data",
uri="s3://data-lake/shared/output.csv",
access_control=AssetAccessControl(
producer_teams=["team_analytics"],
allow_global=False,
),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.

"""
Add allow_producer_teams column to dag_schedule_asset_reference table.
Add access control columns to dag_schedule_asset_reference table.
Comment thread
vincbeck marked this conversation as resolved.

Revision ID: a7f3b2c1d4e5
Revises: b8f3e4a1d2c9
Expand All @@ -38,15 +38,19 @@


def upgrade():
"""Add allow_producer_teams column to dag_schedule_asset_reference."""
"""Add access control columns to dag_schedule_asset_reference."""
with op.batch_alter_table("dag_schedule_asset_reference", schema=None) as batch_op:
batch_op.add_column(sa.Column("allow_producer_teams", sa.JSON(), nullable=True))
batch_op.add_column(
sa.Column("allow_global_producers", sa.Boolean(), nullable=False, server_default=sa.true())
)


def downgrade():
"""Remove allow_producer_teams column from dag_schedule_asset_reference."""
"""Remove access control columns from dag_schedule_asset_reference."""
from airflow.migrations.utils import disable_sqlite_fkeys

with disable_sqlite_fkeys(op):
with op.batch_alter_table("dag_schedule_asset_reference", schema=None) as batch_op:
batch_op.drop_column("allow_global_producers")
batch_op.drop_column("allow_producer_teams")
3 changes: 3 additions & 0 deletions airflow-core/src/airflow/models/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,9 @@ class DagScheduleAssetReference(Base):
asset_id: Mapped[int] = mapped_column(Integer, primary_key=True, nullable=False)
dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True, nullable=False)
allow_producer_teams: Mapped[list | None] = mapped_column(sa.JSON(), nullable=True)
allow_global_producers: Mapped[bool] = mapped_column(
sa.Boolean(), nullable=False, server_default=sa.true()
Comment thread
vincbeck marked this conversation as resolved.
)
created_at: Mapped[datetime] = mapped_column(UtcDateTime, default=timezone.utcnow, nullable=False)
updated_at: Mapped[datetime] = mapped_column(
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False
Expand Down
57 changes: 56 additions & 1 deletion airflow-core/tests/unit/assets/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,21 @@ def _make_dag(dag_id: str) -> DagModel:

def _make_asset_model(
scheduled_dags: dict[str, list[str]] | None = None,
allow_global: dict[str, bool] | None = None,
) -> AssetModel:
"""Create a mock AssetModel.

:param scheduled_dags: mapping of dag_id -> allow_producer_teams for each consumer reference.
:param allow_global: mapping of dag_id -> allow_global_producers for each consumer reference.
"""
allow_global = allow_global or {}
model = mock.Mock(spec=AssetModel)
model.scheduled_dags = [
mock.Mock(dag_id=dag_id, allow_producer_teams=teams)
mock.Mock(
dag_id=dag_id,
allow_producer_teams=teams,
allow_global_producers=allow_global.get(dag_id, True),
)
for dag_id, teams in (scheduled_dags or {}).items()
]
return model
Expand Down Expand Up @@ -534,3 +541,51 @@ def test_both_teamless_allowed(self):
)

assert dag in result

@conf_vars({("core", "multi_team"): "true"})
def test_teamless_dag_producer_blocked_when_allow_global_false(self):
"""Teamless DAG producer is blocked when consumer's allow_global_producers=False."""
dag = _make_dag("dag1")

with mock.patch.object(DagModel, "get_dag_id_to_team_name_mapping", return_value={"dag1": "team_b"}):
result = AssetManager._filter_dags_by_team(
dags_to_queue={dag},
source_teams=set(),
asset_model=_make_asset_model(scheduled_dags={"dag1": []}, allow_global={"dag1": False}),
source_is_api=False,
session=mock.Mock(),
)

assert dag not in result

@conf_vars({("core", "multi_team"): "true"})
def test_teamless_dag_producer_allowed_when_allow_global_true(self):
"""Teamless DAG producer allowed when consumer's allow_global_producers=True (default)."""
dag = _make_dag("dag1")

with mock.patch.object(DagModel, "get_dag_id_to_team_name_mapping", return_value={"dag1": "team_b"}):
result = AssetManager._filter_dags_by_team(
dags_to_queue={dag},
source_teams=set(),
asset_model=_make_asset_model(scheduled_dags={"dag1": []}, allow_global={"dag1": True}),
source_is_api=False,
session=mock.Mock(),
)

assert dag in result

@conf_vars({("core", "multi_team"): "true"})
def test_teamless_api_user_not_affected_by_allow_global(self):
"""Teamless API user behavior unchanged by allow_global — still blocked from team-bound consumers."""
dag_with_team = _make_dag("dag1")

with mock.patch.object(DagModel, "get_dag_id_to_team_name_mapping", return_value={"dag1": "team_b"}):
result = AssetManager._filter_dags_by_team(
dags_to_queue={dag_with_team},
source_teams=set(),
asset_model=_make_asset_model(scheduled_dags={"dag1": []}, allow_global={"dag1": True}),
source_is_api=True,
session=mock.Mock(),
)

assert dag_with_team not in result
10 changes: 6 additions & 4 deletions airflow-core/tests/unit/dag_processing/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,16 @@ def per_test(self) -> Generator:
self.clean_db()

@pytest.mark.usefixtures("testing_dag_bundle")
def test_sync_assets_preserves_allow_producer_teams_from_other_bundle(self, dag_maker, session):
def test_sync_assets_preserves_access_control_from_other_bundle(self, dag_maker, session):
"""When a producer bundle (without access_control) is synced after a consumer bundle
(with access_control), the stored allow_producer_teams must not be wiped out."""
(with access_control), the stored access control fields must not be wiped out."""
from airflow.models.asset import DagScheduleAssetReference
from airflow.sdk import AssetAccessControl

# First sync: consumer bundle sets access_control on the asset.
consumer_asset = Asset(
"shared_asset",
access_control=AssetAccessControl(producer_teams=["team1", "team2"]),
access_control=AssetAccessControl(producer_teams=["team1", "team2"], allow_global=False),
)
with dag_maker(dag_id="consumer_dag", schedule=[consumer_asset]) as consumer_dag:
EmptyOperator(task_id="mytask")
Expand All @@ -170,6 +170,7 @@ def test_sync_assets_preserves_allow_producer_teams_from_other_bundle(self, dag_
select(DagScheduleAssetReference).where(DagScheduleAssetReference.dag_id == "consumer_dag")
)
assert ref.allow_producer_teams == ["team1", "team2"]
assert ref.allow_global_producers is False

# Second sync: producer bundle references the same asset WITHOUT access_control.
producer_asset = Asset("shared_asset")
Expand All @@ -182,9 +183,10 @@ def test_sync_assets_preserves_allow_producer_teams_from_other_bundle(self, dag_
asset_op.sync_assets(session=session)
session.flush()

# Consumer's allow_producer_teams must still be preserved.
# Consumer's access control must still be preserved.
session.expire(ref)
assert ref.allow_producer_teams == ["team1", "team2"]
assert ref.allow_global_producers is False

@pytest.mark.parametrize(
("is_active", "is_paused", "expected_num_triggers"),
Expand Down
Loading