Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 55 additions & 13 deletions airflow-core/docs/authoring-and-scheduling/assets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -402,51 +402,93 @@ As mentioned in :ref:`Fetching information from previously emitted asset events<
events = inlet_events[AssetAlias("example-alias")]
last_row_count = events[-1].extra["row_count"]

.. _asset_allow_producer_teams:
.. _asset_access_control:

Cross-team asset event filtering with ``allow_producer_teams``
--------------------------------------------------------------
Cross-team asset event filtering with ``access_control``
--------------------------------------------------------

.. versionadded:: 3.3.0

When :doc:`Multi-Team mode </core-concepts/multi-team>` is enabled, asset events are filtered by team
membership. By default, a consuming Dag only receives asset events produced by Dags within the same team
or by global (teamless) Dags. This prevents unintended cross-team triggers.

To allow specific other teams to produce events that trigger your Dag, use the ``allow_producer_teams`` parameter
on the ``Asset`` definition:
To configure cross-team access, use the ``access_control`` parameter on the ``Asset`` definition with an
``AssetAccessControl`` instance:

.. code-block:: python

from airflow.sdk import Asset
from airflow.sdk import Asset, AssetAccessControl

shared_data = Asset(
name="my_data",
uri="s3://bucket/shared/data.csv",
allow_producer_teams=["team_analytics", "team_ml"],
access_control=AssetAccessControl(
producer_teams=["team_analytics", "team_ml"],
),
)

In this example, asset events produced by Dags belonging to ``team_analytics`` or ``team_ml`` will be
accepted by any consuming Dag that schedules on ``shared_data``, in addition to events from the consuming
Dag's own team.

``AssetAccessControl`` parameters
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The ``AssetAccessControl`` class accepts the following parameters:

- **producer_teams** (``list[str]``, default ``[]``): List of team names allowed to produce events
consumed by this asset's consumers, in addition to the consumer's own team.
- **allow_global** (``bool``, default ``True``): Whether teamless (global) Dag producers can trigger
Comment thread
o-nikolas marked this conversation as resolved.
consumers of this asset. When set to ``False``, only Dags with an explicit team association
(same team or listed in ``producer_teams``) can trigger consumers.

Blocking global producers
~~~~~~~~~~~~~~~~~~~~~~~~~

By default, global (teamless) Dags can trigger any consumer. In strict team isolation scenarios, you
may want to block teamless producers:

.. code-block:: python

from airflow.sdk import Asset, AssetAccessControl

strict_data = Asset(
name="strict_data",
uri="s3://bucket/strict/data.csv",
access_control=AssetAccessControl(
producer_teams=["team_analytics"],
allow_global=False,
),
)

With ``allow_global=False``, only Dags belonging to the consumer's own team or to ``team_analytics`` can
trigger consumers of ``strict_data``. Teamless Dag producers are blocked.

.. note::

The ``allow_global`` flag only affects Dag producers. Teamless API users are always restricted to
triggering teamless consumers only, regardless of this setting.

Default behavior
~~~~~~~~~~~~~~~~

When ``allow_producer_teams`` is not specified (or set to an empty list), the default same-team filtering applies.
The rules depend on whether the producer and consumer have a team association:
When ``access_control`` is not specified, a default ``AssetAccessControl()`` is used (empty
``producer_teams`` and ``allow_global=True``). The rules depend on whether the producer and consumer
have a team association:

- **Both have the same team**: The event is always delivered.
- **Producer has a team, consumer has a different team**: The event is blocked (unless the
producer's team is in the asset's ``allow_producer_teams``).
- **Producer has no team (global Dag)**: The event is delivered to all consumers, regardless of
the consumer's team. Global Dags act as shared infrastructure that any team can depend on.
producer's team is in the asset's ``producer_teams``).
- **Producer has no team (global Dag)**: The event is delivered to all consumers whose asset has
``allow_global=True`` (the default). Global Dags act as shared infrastructure that any team can
depend on.
- **Consumer has no team (global Dag)**: The consumer accepts events from any source,
regardless of the producer's team. Teamless consumers act as shared infrastructure that any
team can feed into.
- **Neither has a team**: The event is delivered (both are global).

When Multi-Team mode is disabled, ``allow_producer_teams`` is ignored and all asset events are delivered to all
When Multi-Team mode is disabled, ``access_control`` is ignored and all asset events are delivered to all
consuming Dags, preserving backward compatibility.

Asset partitions
Expand Down
56 changes: 45 additions & 11 deletions airflow-core/docs/core-concepts/multi-team.rst
Original file line number Diff line number Diff line change
Expand Up @@ -517,26 +517,41 @@ Default Behavior

By default, a consuming Dag only receives asset events from producers within the same team or from Dags with no team association, i.e. global Dags.

Cross-Team Opt-In with ``allow_producer_teams``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Cross-Team Opt-In with ``access_control``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

To allow specific teams to produce events that trigger consumers on a given asset from another team, use the
``allow_producer_teams`` parameter on the ``Asset`` definition:
``access_control`` parameter on the ``Asset`` definition with an ``AssetAccessControl`` instance:

.. code-block:: python

from airflow.sdk import Asset
from airflow.sdk import Asset, AssetAccessControl

shared_data = Asset(
name="shared_data",
uri="s3://bucket/shared/data.csv",
allow_producer_teams=["team_analytics", "team_ml"],
access_control=AssetAccessControl(
producer_teams=["team_analytics", "team_ml"],
),
)

With this configuration, asset events from ``team_analytics`` or ``team_ml`` will be accepted by any
consuming Dag that schedules on ``shared_data``, in addition to events from the consumer's own team.

See :ref:`Cross-team asset event filtering with allow_producer_teams <asset_allow_producer_teams>` in the Assets
To block global (teamless) Dag producers from triggering consumers, set ``allow_global=False``:

.. code-block:: python

strict_data = Asset(
name="strict_data",
uri="s3://bucket/strict/data.csv",
access_control=AssetAccessControl(
producer_teams=["team_analytics"],
allow_global=False,
),
)

See :ref:`Cross-team asset event filtering with access_control <asset_access_control>` in the Assets
documentation for usage details and validation rules.

Behavioral Rules
Expand All @@ -546,79 +561,98 @@ The following table describes the complete filtering logic:

.. list-table::
:header-rows: 1
:widths: 20 20 20 15 25
:widths: 15 15 18 14 13 25

* - Producer
- Consumer
- ``allow_producer_teams``
- ``producer_teams``
- ``allow_global``
- Result
- Reason
* - Team A (DAG)
- Team A
- (any)
- (any)
- ✅ Allowed
- Same team
* - Team A (DAG)
- Team B
- ``[]``
- (any)
- ❌ Blocked
- Different team, no opt-in
* - Team A (DAG)
- Team B
- ``["team_a"]``
- (any)
- ✅ Allowed
- Cross-team opt-in
* - (no team, DAG)
- Team B
- (any)
- ``True``
- ✅ Allowed
- Global producer
- Global producer, allow_global is True
* - (no team, DAG)
- Team B
- (any)
- ``False``
- ❌ Blocked
- Global producer blocked by allow_global=False
* - Team A (DAG)
- (no team)
- (any)
- (any)
- ✅ Allowed
- Teamless consumer accepts events from any DAG producer
* - (no team, DAG)
- (no team)
- (any)
- (any)
- ✅ Allowed
- Both global
* - Team A (API)
- Team A
- (any)
- (any)
- ✅ Allowed
- Same team
* - Team A (API)
- Team B
- ``["team_a"]``
- (any)
- ✅ Allowed
- Cross-team opt-in
* - Team A (API)
- (no team)
- (any)
- (any)
- ✅ Allowed
- Teamless consumer accepts events from any source
* - (no team, API)
- Team B
- (any)
- (any)
- ❌ Blocked
- Teamless API user cannot trigger team-bound consumer
* - (no team, API)
- (no team)
- (any)
- (any)
- ✅ Allowed
- Both global

Key rules:

- **Same team**: Always allowed.
- **Global (teamless) DAG producer**: Triggers all consumers regardless of team.
- **Global (teamless) DAG producer with** ``allow_global=True``: Triggers all consumers regardless of team.
- **Global (teamless) DAG producer with** ``allow_global=False``: Blocked from triggering team-bound consumers.
- **Teamless API user**: Can only trigger teamless consumers. Unlike a teamless DAG — which is
deployed by a platform operator and intentionally shared — an API user without a team has no
verified team affiliation, so their events are restricted to teamless consumers to
prevent unscoped access to team-bound pipelines.
- **Teamless consumer**: Accepts events from any source (DAG or API), regardless of team.
- **Cross-team via** ``allow_producer_teams``: Allowed when the producer's team is listed in the asset's ``allow_producer_teams``.
- **Cross-team via** ``producer_teams``: Allowed when the producer's team is listed in the asset's ``producer_teams``.
- **Multi-Team disabled**: All filtering is skipped; existing behavior is preserved.

API-Triggered Events
Expand Down
24 changes: 14 additions & 10 deletions airflow-core/src/airflow/example_dags/example_asset_allow_teams.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,36 @@
# specific language governing permissions and limitations
# under the License.
"""
Example DAG demonstrating cross-team asset triggering with ``allow_producer_teams``.
Example DAG demonstrating cross-team asset triggering with ``AssetAccessControl``.

When Multi-Team mode is enabled (``[core] multi_team = True``), asset events are filtered by team
membership. By default, a consuming DAG only receives events from DAGs within the same team.

Usage:
- ``team_analytics_producer`` (belonging to ``team_analytics``) produces events on ``shared_data``.
- ``team_ml_consumer`` (belonging to ``team_ml``) consumes ``shared_data``.
- Because ``shared_data`` has ``allow_producer_teams=["team_analytics"]``, events from ``team_analytics``
are accepted by ``team_ml_consumer``.
- Without ``allow_producer_teams``, the cross-team event would be blocked.
- Because ``shared_data`` has ``access_control=AssetAccessControl(producer_teams=["team_analytics"],
allow_global=False)``, events from ``team_analytics`` are accepted by ``team_ml_consumer``, while
teamless (global) DAG producers are blocked.
- Without ``access_control``, the cross-team event would be blocked.
"""

from __future__ import annotations

import pendulum

from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import DAG, Asset
from airflow.sdk import DAG, Asset, AssetAccessControl

# [START asset_allow_producer_teams]
# Define an asset that accepts events from team_analytics in addition to the consumer's own team.
# [START asset_access_control]
# Define an asset that accepts events from team_analytics but blocks global (teamless) producers.
shared_data = Asset(
name="shared_data",
uri="s3://data-lake/shared/output.csv",
allow_producer_teams=["team_analytics"],
access_control=AssetAccessControl(
producer_teams=["team_analytics"],
allow_global=False,
),
)

# Producer DAG — belongs to team_analytics (via its DAG bundle configuration).
Expand All @@ -60,7 +64,7 @@

# Consumer DAG — belongs to team_ml (via its DAG bundle configuration).
# This DAG is triggered when shared_data is updated. Because shared_data has
# allow_producer_teams=["team_analytics"], events from team_analytics are accepted.
# access_control with producer_teams=["team_analytics"], events from team_analytics are accepted.
with DAG(
dag_id="team_ml_consumer",
start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
Expand All @@ -72,4 +76,4 @@
task_id="consume_shared_data",
bash_command="echo 'Consuming shared data from team_analytics'",
)
# [END asset_allow_producer_teams]
# [END asset_access_control]
Loading