diff --git a/airflow-core/docs/authoring-and-scheduling/assets.rst b/airflow-core/docs/authoring-and-scheduling/assets.rst index 8e5cf356394e0..e9f1eb681857d 100644 --- a/airflow-core/docs/authoring-and-scheduling/assets.rst +++ b/airflow-core/docs/authoring-and-scheduling/assets.rst @@ -402,10 +402,10 @@ As mentioned in :ref:`Fetching information from previously emitted asset events< events = inlet_events[AssetAlias("example-alias")] last_row_count = events[-1].extra["row_count"] -.. _asset_allow_producer_teams: +.. _asset_access_control: -Cross-team asset event filtering with ``allow_producer_teams`` --------------------------------------------------------------- +Cross-team asset event filtering with ``access_control`` +-------------------------------------------------------- .. versionadded:: 3.3.0 @@ -413,40 +413,82 @@ When :doc:`Multi-Team mode ` is enabled, asset events membership. By default, a consuming Dag only receives asset events produced by Dags within the same team or by global (teamless) Dags. This prevents unintended cross-team triggers. -To allow specific other teams to produce events that trigger your Dag, use the ``allow_producer_teams`` parameter -on the ``Asset`` definition: +To configure cross-team access, use the ``access_control`` parameter on the ``Asset`` definition with an +``AssetAccessControl`` instance: .. code-block:: python - from airflow.sdk import Asset + from airflow.sdk import Asset, AssetAccessControl shared_data = Asset( name="my_data", uri="s3://bucket/shared/data.csv", - allow_producer_teams=["team_analytics", "team_ml"], + access_control=AssetAccessControl( + producer_teams=["team_analytics", "team_ml"], + ), ) In this example, asset events produced by Dags belonging to ``team_analytics`` or ``team_ml`` will be accepted by any consuming Dag that schedules on ``shared_data``, in addition to events from the consuming Dag's own team. +``AssetAccessControl`` parameters +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The ``AssetAccessControl`` class accepts the following parameters: + +- **producer_teams** (``list[str]``, default ``[]``): List of team names allowed to produce events + consumed by this asset's consumers, in addition to the consumer's own team. +- **allow_global** (``bool``, default ``True``): Whether teamless (global) Dag producers can trigger + consumers of this asset. When set to ``False``, only Dags with an explicit team association + (same team or listed in ``producer_teams``) can trigger consumers. + +Blocking global producers +~~~~~~~~~~~~~~~~~~~~~~~~~ + +By default, global (teamless) Dags can trigger any consumer. In strict team isolation scenarios, you +may want to block teamless producers: + +.. code-block:: python + + from airflow.sdk import Asset, AssetAccessControl + + strict_data = Asset( + name="strict_data", + uri="s3://bucket/strict/data.csv", + access_control=AssetAccessControl( + producer_teams=["team_analytics"], + allow_global=False, + ), + ) + +With ``allow_global=False``, only Dags belonging to the consumer's own team or to ``team_analytics`` can +trigger consumers of ``strict_data``. Teamless Dag producers are blocked. + +.. note:: + + The ``allow_global`` flag only affects Dag producers. Teamless API users are always restricted to + triggering teamless consumers only, regardless of this setting. + Default behavior ~~~~~~~~~~~~~~~~ -When ``allow_producer_teams`` is not specified (or set to an empty list), the default same-team filtering applies. -The rules depend on whether the producer and consumer have a team association: +When ``access_control`` is not specified, a default ``AssetAccessControl()`` is used (empty +``producer_teams`` and ``allow_global=True``). The rules depend on whether the producer and consumer +have a team association: - **Both have the same team**: The event is always delivered. - **Producer has a team, consumer has a different team**: The event is blocked (unless the - producer's team is in the asset's ``allow_producer_teams``). -- **Producer has no team (global Dag)**: The event is delivered to all consumers, regardless of - the consumer's team. Global Dags act as shared infrastructure that any team can depend on. + producer's team is in the asset's ``producer_teams``). +- **Producer has no team (global Dag)**: The event is delivered to all consumers whose asset has + ``allow_global=True`` (the default). Global Dags act as shared infrastructure that any team can + depend on. - **Consumer has no team (global Dag)**: The consumer accepts events from any source, regardless of the producer's team. Teamless consumers act as shared infrastructure that any team can feed into. - **Neither has a team**: The event is delivered (both are global). -When Multi-Team mode is disabled, ``allow_producer_teams`` is ignored and all asset events are delivered to all +When Multi-Team mode is disabled, ``access_control`` is ignored and all asset events are delivered to all consuming Dags, preserving backward compatibility. Asset partitions diff --git a/airflow-core/docs/core-concepts/multi-team.rst b/airflow-core/docs/core-concepts/multi-team.rst index 88a99e2be4077..eb6371701b69f 100644 --- a/airflow-core/docs/core-concepts/multi-team.rst +++ b/airflow-core/docs/core-concepts/multi-team.rst @@ -517,26 +517,41 @@ Default Behavior By default, a consuming Dag only receives asset events from producers within the same team or from Dags with no team association, i.e. global Dags. -Cross-Team Opt-In with ``allow_producer_teams`` -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Cross-Team Opt-In with ``access_control`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ To allow specific teams to produce events that trigger consumers on a given asset from another team, use the -``allow_producer_teams`` parameter on the ``Asset`` definition: +``access_control`` parameter on the ``Asset`` definition with an ``AssetAccessControl`` instance: .. code-block:: python - from airflow.sdk import Asset + from airflow.sdk import Asset, AssetAccessControl shared_data = Asset( name="shared_data", uri="s3://bucket/shared/data.csv", - allow_producer_teams=["team_analytics", "team_ml"], + access_control=AssetAccessControl( + producer_teams=["team_analytics", "team_ml"], + ), ) With this configuration, asset events from ``team_analytics`` or ``team_ml`` will be accepted by any consuming Dag that schedules on ``shared_data``, in addition to events from the consumer's own team. -See :ref:`Cross-team asset event filtering with allow_producer_teams ` in the Assets +To block global (teamless) Dag producers from triggering consumers, set ``allow_global=False``: + +.. code-block:: python + + strict_data = Asset( + name="strict_data", + uri="s3://bucket/strict/data.csv", + access_control=AssetAccessControl( + producer_teams=["team_analytics"], + allow_global=False, + ), + ) + +See :ref:`Cross-team asset event filtering with access_control ` in the Assets documentation for usage details and validation rules. Behavioral Rules @@ -546,79 +561,98 @@ The following table describes the complete filtering logic: .. list-table:: :header-rows: 1 - :widths: 20 20 20 15 25 + :widths: 15 15 18 14 13 25 * - Producer - Consumer - - ``allow_producer_teams`` + - ``producer_teams`` + - ``allow_global`` - Result - Reason * - Team A (DAG) - Team A - (any) + - (any) - ✅ Allowed - Same team * - Team A (DAG) - Team B - ``[]`` + - (any) - ❌ Blocked - Different team, no opt-in * - Team A (DAG) - Team B - ``["team_a"]`` + - (any) - ✅ Allowed - Cross-team opt-in * - (no team, DAG) - Team B - (any) + - ``True`` - ✅ Allowed - - Global producer + - Global producer, allow_global is True + * - (no team, DAG) + - Team B + - (any) + - ``False`` + - ❌ Blocked + - Global producer blocked by allow_global=False * - Team A (DAG) - (no team) - (any) + - (any) - ✅ Allowed - Teamless consumer accepts events from any DAG producer * - (no team, DAG) - (no team) - (any) + - (any) - ✅ Allowed - Both global * - Team A (API) - Team A - (any) + - (any) - ✅ Allowed - Same team * - Team A (API) - Team B - ``["team_a"]`` + - (any) - ✅ Allowed - Cross-team opt-in * - Team A (API) - (no team) - (any) + - (any) - ✅ Allowed - Teamless consumer accepts events from any source * - (no team, API) - Team B - (any) + - (any) - ❌ Blocked - Teamless API user cannot trigger team-bound consumer * - (no team, API) - (no team) - (any) + - (any) - ✅ Allowed - Both global Key rules: - **Same team**: Always allowed. -- **Global (teamless) DAG producer**: Triggers all consumers regardless of team. +- **Global (teamless) DAG producer with** ``allow_global=True``: Triggers all consumers regardless of team. +- **Global (teamless) DAG producer with** ``allow_global=False``: Blocked from triggering team-bound consumers. - **Teamless API user**: Can only trigger teamless consumers. Unlike a teamless DAG — which is deployed by a platform operator and intentionally shared — an API user without a team has no verified team affiliation, so their events are restricted to teamless consumers to prevent unscoped access to team-bound pipelines. - **Teamless consumer**: Accepts events from any source (DAG or API), regardless of team. -- **Cross-team via** ``allow_producer_teams``: Allowed when the producer's team is listed in the asset's ``allow_producer_teams``. +- **Cross-team via** ``producer_teams``: Allowed when the producer's team is listed in the asset's ``producer_teams``. - **Multi-Team disabled**: All filtering is skipped; existing behavior is preserved. API-Triggered Events diff --git a/airflow-core/src/airflow/example_dags/example_asset_allow_teams.py b/airflow-core/src/airflow/example_dags/example_asset_allow_teams.py index 0dc71db8d2002..d77eeb9d2b7ed 100644 --- a/airflow-core/src/airflow/example_dags/example_asset_allow_teams.py +++ b/airflow-core/src/airflow/example_dags/example_asset_allow_teams.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. """ -Example DAG demonstrating cross-team asset triggering with ``allow_producer_teams``. +Example DAG demonstrating cross-team asset triggering with ``AssetAccessControl``. When Multi-Team mode is enabled (``[core] multi_team = True``), asset events are filtered by team membership. By default, a consuming DAG only receives events from DAGs within the same team. @@ -23,9 +23,10 @@ Usage: - ``team_analytics_producer`` (belonging to ``team_analytics``) produces events on ``shared_data``. - ``team_ml_consumer`` (belonging to ``team_ml``) consumes ``shared_data``. - - Because ``shared_data`` has ``allow_producer_teams=["team_analytics"]``, events from ``team_analytics`` - are accepted by ``team_ml_consumer``. - - Without ``allow_producer_teams``, the cross-team event would be blocked. + - Because ``shared_data`` has ``access_control=AssetAccessControl(producer_teams=["team_analytics"], + allow_global=False)``, events from ``team_analytics`` are accepted by ``team_ml_consumer``, while + teamless (global) DAG producers are blocked. + - Without ``access_control``, the cross-team event would be blocked. """ from __future__ import annotations @@ -33,14 +34,17 @@ import pendulum from airflow.providers.standard.operators.bash import BashOperator -from airflow.sdk import DAG, Asset +from airflow.sdk import DAG, Asset, AssetAccessControl -# [START asset_allow_producer_teams] -# Define an asset that accepts events from team_analytics in addition to the consumer's own team. +# [START asset_access_control] +# Define an asset that accepts events from team_analytics but blocks global (teamless) producers. shared_data = Asset( name="shared_data", uri="s3://data-lake/shared/output.csv", - allow_producer_teams=["team_analytics"], + access_control=AssetAccessControl( + producer_teams=["team_analytics"], + allow_global=False, + ), ) # Producer DAG — belongs to team_analytics (via its DAG bundle configuration). @@ -60,7 +64,7 @@ # Consumer DAG — belongs to team_ml (via its DAG bundle configuration). # This DAG is triggered when shared_data is updated. Because shared_data has -# allow_producer_teams=["team_analytics"], events from team_analytics are accepted. +# access_control with producer_teams=["team_analytics"], events from team_analytics are accepted. with DAG( dag_id="team_ml_consumer", start_date=pendulum.datetime(2024, 1, 1, tz="UTC"), @@ -72,4 +76,4 @@ task_id="consume_shared_data", bash_command="echo 'Consuming shared data from team_analytics'", ) -# [END asset_allow_producer_teams] +# [END asset_access_control]