Skip to content

Move allow_producer_teams from asset model to DagScheduleAssetReference#66487

Merged
vincbeck merged 1 commit into
apache:mainfrom
aws-mwaa:vincbeck/allow_producer_teams
May 8, 2026
Merged

Move allow_producer_teams from asset model to DagScheduleAssetReference#66487
vincbeck merged 1 commit into
apache:mainfrom
aws-mwaa:vincbeck/allow_producer_teams

Conversation

@vincbeck
Copy link
Copy Markdown
Contributor

@vincbeck vincbeck commented May 6, 2026

Storing the allow_producer_teams parameter on the AssetModel was bad design decision. The same asset can be (and is expected to be) defined in multiple Dags across different bundles. For example:

Producer Dag (team1 bundle):

   with DAG(
       dag_id="example_producer_team1",
       schedule="@once",
       catchup=False,
   ):
       task = EmptyOperator(task_id="produce_event", outlets=[Asset("my_asset")])
       chain(task)

Consumer Dag (team2 bundle):

   asset = Asset("my_asset", allow_producer_teams=["team1", "team2"])

   with DAG(
       dag_id="example_consumer_team2",
       schedule=[asset],
       catchup=False,
   ):
       task = EmptyOperator(task_id="dummy_task")

The asset my_asset is referenced in both Dags. The producer does not define allow_producer_teams, but the consumer does. Since each bundle is processed independently, whichever bundle is parsed last overwrites the asset row in the database. In other words, the producer bundle could wipe out the consumer's allow_producer_teams.

Even though the asset is declared in multiple Dags, there is only one row in the asset table. Storing allow_producer_teams there was not the right choice because it is a consumer-side declaration: it controls which teams are allowed to trigger this specific consumer, not a global property of the asset itself.

Solution: Move allow_producer_teams to the dag_schedule_asset_reference table, which represents the relationship between a consumer Dag and the asset it schedules on.


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

Comment thread airflow-core/tests/unit/assets/test_manager.py Outdated
Comment thread airflow-core/tests/unit/dag_processing/test_collection.py Outdated
@vincbeck vincbeck force-pushed the vincbeck/allow_producer_teams branch from 002457d to fa9c421 Compare May 7, 2026 14:15
@vincbeck vincbeck force-pushed the vincbeck/allow_producer_teams branch from fa9c421 to 35d7ee1 Compare May 7, 2026 14:57
@vincbeck vincbeck requested a review from Lee-W May 7, 2026 19:10
@vincbeck vincbeck merged commit 98c4b83 into apache:main May 8, 2026
191 of 192 checks passed
@vincbeck vincbeck deleted the vincbeck/allow_producer_teams branch May 8, 2026 12:19
arpitrathore pushed a commit to arpitrathore/airflow that referenced this pull request May 9, 2026
jason810496 pushed a commit to jason810496/airflow that referenced this pull request May 11, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants