Skip to content

Cache Celery apps when publishing workloads#67127

Open
anmolxlight wants to merge 2 commits into
apache:mainfrom
anmolxlight:fix/67123-celery-app-cache
Open

Cache Celery apps when publishing workloads#67127
anmolxlight wants to merge 2 commits into
apache:mainfrom
anmolxlight:fix/67123-celery-app-cache

Conversation

@anmolxlight
Copy link
Copy Markdown
Contributor

@anmolxlight anmolxlight commented May 18, 2026

What

  • cache the Celery app used by send_workload_to_executor inside each publisher subprocess
  • key the cache by team_name so multi-team configurations still get isolated app instances
  • add regression coverage proving repeated publishes for the same team reuse the app while different teams remain separated

Why

send_workload_to_executor currently creates a fresh Celery() app for every publish. Each fresh app loses Celery's lazy backend cache, so apply_async() repeatedly performs backend resolution and entry point scanning. On large deployments this can push task publishing past [celery] operation_timeout.

Caching the app per subprocess preserves the post-AIP-67 behavior of constructing apps inside publisher subprocesses while restoring per-process amortization.

fixes #67123

Tests

  • uv run --project providers/celery pytest providers/celery/tests/unit/celery/executors/test_celery_executor.py -q
  • uv run ruff check providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py providers/celery/tests/unit/celery/executors/test_celery_executor.py
  • uv run ruff format --check providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py providers/celery/tests/unit/celery/executors/test_celery_executor.py

@seanmuth
Copy link
Copy Markdown
Contributor

Drive-by from #67123 — confirming this matches the fix I proposed there. Both shapes (@lru_cache on a thin wrapper keyed on team_name) are functionally equivalent; preserves AIP-67 isolation while restoring per-subprocess amortization of the result-backend resolution that pre-3.16.0 was getting from the module-level app singleton.

Sharing some empirical numbers I gathered while reproducing #67123, in case they're useful for the reviewer:

Per-publish overhead, measured as create_celery_app(conf); _ = app.backend (the path inside [celery] operation_timeout). Astro Runtime 13.4.0 (Airflow 2.11.0), 1 vCPU / 2 GiB scheduler, 656 installed distributions, 100 iterations.

Configuration min p50 p95 p99 max iter/s
Idle, no load 48ms 51ms 67ms 82ms 86ms 18.8
Idle + 1.5 GiB anon allocation 48 52 89 95 100 17.5
5 modest Dags with mapped tasks running 52 89 232 363 385 9.6
Same Dag load + 1.0 GiB anon allocation 51 112 498 698 718 5.7

Reference: a real production deployment with the regression (~682 distributions, ~640 active Dags, periodic scheduler OOMs) shows max=558ms with consistent Task Timeout Error for Task log entries firing at the default 1.0s operation_timeout. Downgrading to providers-celery 3.15.x in the same image collapses the per-publish path to microseconds (singleton's backend is already resolved), confirming the regression's scope.

The takeaway for sizing the fix's expected impact: with this cache in place, the per-subprocess first-publish cost is paid once per team_name and then amortized over the subprocess's lifetime, restoring the pre-3.16.0 behavior. apply_async afterwards is just the broker round-trip plus a cached attribute access.

A couple of small things you might want to consider rolling in (entirely optional — happy if you ignore):

  1. A docstring on _get_celery_app_for_workload linking to the issue and briefly noting why the cache exists — future readers wondering about the indirection will appreciate the breadcrumb. Without context, "subprocess-local Celery app cached by team name for task publishing" reads like incidental optimization rather than a regression fix.

  2. A third unit test that exercises _get_celery_app_for_workload directly (without going through send_workload_to_executor) — asserting that consecutive calls for the same team return the same object, and that different team names return distinct objects. The two existing tests cover the end-to-end behavior well; a direct-on-the-cache test would catch refactors that move work out of _get_celery_app_for_workload without breaking the integration path.

Comparison point in case it's useful: PR #61798 implemented the same AIP-67 multi-team feature for KubernetesExecutor without the per-publish cost because that executor's design naturally holds the team-aware KubeConfig once at __init__. Worth noting that the per-publish reconstruction wasn't an inherent AIP-67 requirement — it was an artifact of the ProcessPoolExecutor.submit(...) shape the Celery publisher pool already had. This PR effectively brings the Celery side back in line with that pattern.

Either way, the fix is correct and I'd love to see it merged. Thanks for picking this up so quickly.


Drafted-by: Claude Code (Opus 4.7); reviewed by @seanmuth before posting

return celery_app


@lru_cache(maxsize=8)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 8?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed. Replaced the arbitrary maxsize=8 with @cache, so configured teams are cached without eviction in the publishing process.


@lru_cache(maxsize=8)
def _get_celery_app_for_workload(team_name: str | None) -> Celery:
"""Return a subprocess-local Celery app cached by team name for task publishing."""
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "subprocess-local" claim only holds when _send_workloads_to_celery actually uses the ProcessPoolExecutor branch. In celery_executor.py:243-245, the single-workload (or sync_parallelism=1) path runs send_workload_to_executor inline via map(...) in the scheduler process. In that case this cache lives in the scheduler itself and keeps the cached Celery apps' broker connections open there too. Worth either updating the docstring to reflect "scheduler process or publisher subprocess, depending on path", or being explicit that this is intentional.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Updated the docstring to describe both scheduler-inline and publisher-subprocess execution paths, and made the cache process-local rather than subprocess-specific.

def _get_celery_app_for_workload(team_name: str | None) -> Celery:
"""Return a subprocess-local Celery app cached by team name for task publishing."""
if TYPE_CHECKING:
_conf: ExecutorConf | AirflowConfigParser
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if TYPE_CHECKING: _conf: ExecutorConf | AirflowConfigParser is a copy-paste leftover from the old send_workload_to_executor. The annotation never escapes this function and _conf already gets a concrete type from the if/else assignment below, so this whole two-line block can be dropped.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Removed the leftover TYPE_CHECKING annotation block from _get_celery_app_for_workload.

def clear_cached_workload_celery_apps():
celery_executor_utils._get_celery_app_for_workload.cache_clear()
yield
celery_executor_utils._get_celery_app_for_workload.cache_clear()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two new tests cover team-a reuse and team-a vs team-b separation, but not team_name=None, which is the path the vast majority of deployments hit (no multi-team config). A third parametrized case asserting None also hits the cache would catch a regression where lru_cache started treating None specially.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Added direct cache tests for team_name=None, same-team reuse, and distinct team names.

@anmolxlight
Copy link
Copy Markdown
Contributor Author

Rolled in the optional follow-ups: documented why the cache exists and where it lives, added direct cache coverage including team_name=None, removed the leftover type-only block, and changed the cache to avoid an arbitrary team limit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Celery executor: per-publish Celery app reconstruction (since providers-celery 3.16.0) causes operation_timeout breaches

3 participants