Skip to content

remove N+1 db queries for team names#61471

Open
steveahnahn wants to merge 2 commits intoapache:mainfrom
steveahnahn:batch-team-lookup
Open

remove N+1 db queries for team names#61471
steveahnahn wants to merge 2 commits intoapache:mainfrom
steveahnahn:batch-team-lookup

Conversation

@steveahnahn
Copy link
Contributor

Summary

Optimize multi-team executor resolution in the scheduler by batching dag_id -> team_name lookups and passing the resolved team_name into executor selection, eliminating per-task DB queries when core.multi_team=true.
With multi-team enabled, the scheduler can process many task instances per loop. Previously, team resolution could trigger repeated database lookups (effectively N queries for N task instances) when selecting executors. This PR reduces that overhead by resolving team names once per unique DAG ID and reusing the results.

  • Adds a batched lookup helper to resolve team names for multiple DAG IDs in one query
  • Preserves legacy behavior when core.multi_team=false (no additional queries / logic).
  • Add test_multi_team_executor_to_tis_batch_optimization to assert _executor_to_tis() performs a single batched query and does not call _get_task_team_name().

@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Feb 4, 2026
@vincbeck vincbeck requested a review from o-nikolas February 4, 2026 21:59
Comment on lines +2934 to +2938
if (
executor := self._try_to_load_executor(
ti, session, team_name=dag_id_to_team_name.get(ti.dag_id, NOTSET)
)
) is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let’s split this to a separate = statement and then if executor is None, this is too long IMO.

Comment on lines +2885 to +2888
dag_id_to_team_name: dict[str, str | None] = {}
if conf.getboolean("core", "multi_team"):
unique_dag_ids = {ti.dag_id for ti in task_instances_without_heartbeats}
dag_id_to_team_name = self._get_team_names_for_dag_ids(unique_dag_ids, session)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
dag_id_to_team_name: dict[str, str | None] = {}
if conf.getboolean("core", "multi_team"):
unique_dag_ids = {ti.dag_id for ti in task_instances_without_heartbeats}
dag_id_to_team_name = self._get_team_names_for_dag_ids(unique_dag_ids, session)
if conf.getboolean("core", "multi_team"):
unique_dag_ids = {ti.dag_id for ti in task_instances_without_heartbeats}
dag_id_to_team_name = self._get_team_names_for_dag_ids(unique_dag_ids, session)
else:
dag_id_to_team_name = {}

Copy link
Member

@uranusjr uranusjr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of minor suggestions, overrall logic is good to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants