Configurable reduction in Celery chatter on Redis queues#8077
Configurable reduction in Celery chatter on Redis queues#8077
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
1 Skipped Deployment
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #8077 +/- ##
=======================================
Coverage 85.14% 85.15%
=======================================
Files 637 637
Lines 41936 41952 +16
Branches 4927 4931 +4
=======================================
+ Hits 35707 35723 +16
Misses 5121 5121
Partials 1108 1108 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Replace the single worker_disable_gossip_heartbeat_mingle flag with three independent options: worker_disable_heartbeat, worker_disable_gossip, and worker_disable_mingle. Each flag controls its own --without-* Celery argument independently. Add comprehensive tests covering all combinations of the three flags. Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
|
/code-review |
There was a problem hiding this comment.
Code Review: Configurable Celery Worker Flags
This is a clean, well-motivated change. The core logic in _run_celery_worker is clear and the new CelerySettings fields are well-described. The test suite is thorough for the three worker_disable_* flags and worker_concurrency. A few issues worth addressing:
worker_enable_remote_control is unimplemented (medium)
This is the most significant gap. The setting is declared in CelerySettings but no code in worker/__init__.py reads it. The other four new settings all have explicit handling; this one does not. It may work through the implicit celery_config.update(config.celery) merge in _create_celery (since worker_enable_remote_control is a valid Celery config key), but that path is undocumented and untested. See inline comment on celery_settings.py.
Changelog boilerplate not removed (minor)
The two template comment lines at the top of the changelog YAML were not cleaned up. See inline comment.
Log message fires before worker starts (minor)
"Worker started with ..." is logged before celery_app.worker_main() is called. See inline comment.
TestStartWorker hardcodes --concurrency=2 (minor)
The existing test_start_worker_with_arguments test asserts the literal string "--concurrency=2" without mocking CONFIG, creating a hidden dependency on the default value. See inline comment.
Unused fixture parameter (nit)
The mock_celery_config autouse fixture in both new test classes declares worker_main_mock as a parameter without using it. See inline comment.
🔬 Codegraph: connected (49082 nodes)
💡 Write /code-review in a comment to re-run this review.
| @@ -0,0 +1,7 @@ | |||
| # Copy this file and rename it to <pr_number>-<short-description>.yaml (e.g., 1234-add-user-endpoint.yaml) | |||
| # Fill in the required fields and delete this comment block | |||
There was a problem hiding this comment.
changelog/8077-individual-celery-worker-flags.yaml:1-2
The two template comment lines at the top of the file should be removed before merging:
# Copy this file and rename it to <pr_number>-<short-description>.yaml (e.g., 1234-add-user-endpoint.yaml)
# Fill in the required fields and delete this comment blockAlso, the description field only mentions three flags (worker_disable_heartbeat, worker_disable_gossip, worker_disable_mingle) but the PR also introduces worker_enable_remote_control and worker_concurrency. Consider updating the description to capture all five new settings.
| default=True, | ||
| description="If false, disables Celery remote control (pidbox). " | ||
| "Reduces Redis pub/sub connections. Disabling prevents remote worker management commands.", | ||
| ) |
There was a problem hiding this comment.
src/fides/config/celery_settings.py:36-40
worker_enable_remote_control is defined here but never explicitly consumed in src/fides/api/worker/__init__.py. Unlike the three worker_disable_* flags and worker_concurrency, there is no code path that reads this setting and acts on it.
It may work implicitly: _create_celery in src/fides/api/tasks/__init__.py calls celery_config.update(config.celery), which merges all CelerySettings fields into the Celery app config dict, and worker_enable_remote_control is a valid Celery configuration key. If that's the intended mechanism, a comment explaining this and a test asserting the behaviour would clarify the design intent and prevent accidental regression.
| without_flags.append("--without-mingle") | ||
| if without_flags: | ||
| argv += without_flags | ||
| logger.info( |
There was a problem hiding this comment.
src/fides/api/worker/__init__.py:57
The log message says "Worker started with ..." but it fires before celery_app.worker_main(argv=argv) is called on the next line — the worker hasn't started yet at this point. Consider changing the wording to "Starting worker with ..." or "Worker configured with ..." to avoid misleading operators reading the logs.
| """Tests for the individual worker_disable_heartbeat/gossip/mingle config flags.""" | ||
|
|
||
| @pytest.fixture(autouse=True) | ||
| def mock_celery_config(self, worker_main_mock: MagicMock): |
There was a problem hiding this comment.
tests/ctl/api/test_worker.py:141-150
The mock_celery_config autouse fixture declares worker_main_mock: MagicMock as a parameter but never uses it inside the fixture body. The same pattern appears in TestWorkerConcurrency. This parameter appears to be a leftover. If it was added to force ordering (ensure the class-level @patch is applied before the fixture runs), a brief comment explaining that intent would help; otherwise it can be dropped.
| worker_enable_remote_control: bool = Field( | ||
| default=True, | ||
| description="If false, disables Celery remote control (pidbox). " | ||
| "Reduces Redis pub/sub connections. Disabling prevents remote worker management commands.", |
There was a problem hiding this comment.
This field isn't consumed anywhere in the worker startup code. The other four flags (worker_disable_heartbeat, worker_disable_gossip, worker_disable_mingle, worker_concurrency) are all wired up in _run_celery_worker, but this one doesn't get read. Should it be wired into the worker argv, or is it planned for a follow-up? If it's not ready yet, might be cleaner to remove it from this PR and add it when the consumption logic lands.
There was a problem hiding this comment.
Removed, it was a holdover that didn't get used
| mock_celery_config.celery.worker_disable_heartbeat = True | ||
| mock_celery_config.celery.worker_disable_gossip = True | ||
| mock_celery_config.celery.worker_disable_mingle = False | ||
|
|
There was a problem hiding this comment.
nit: since each disable flag is independent (no interaction between them), the parametrized single-flag test plus the all-three test already prove correctness. The three pairwise combo tests (heartbeat+gossip, gossip+mingle, heartbeat+mingle) don't add new signal. Up to you whether you want to keep them for completeness or trim them down.
There was a problem hiding this comment.
Fixed in a subsequent commit
Description Of Changes
Replaces the single
worker_disable_gossip_heartbeat_mingleconfig flag with five independent, granular Celery worker settings:worker_disable_heartbeatFalse--without-heartbeatworker_disable_gossipFalse--without-gossipworker_disable_mingleFalse--without-mingleworker_enable_remote_controlTrueworker_concurrency2--concurrency=N(was hardcoded to 2)These are configurable via environment variables under the
FIDES__CELERY__prefix (e.g.FIDES__CELERY__WORKER_DISABLE_HEARTBEAT=true). The motivation is to give operators granular control over Celery's Redis chatter, which is a common workaround for celery/celery#7276 (BRPOP connection drop issues).Code Changes
src/fides/config/celery_settings.py— Added five newCelerySettingsfields with descriptionssrc/fides/api/worker/__init__.py— Refactored_run_celery_workerto read flags fromCONFIG.celeryand conditionally append--without-*args; made--concurrencyconfigurable instead of hardcodedchangelog/8077-individual-celery-worker-flags.yaml— Changelog entrySteps to Confirm
nox -s devstarts workers without--without-*flagsFIDES__CELERY__WORKER_DISABLE_HEARTBEAT=trueand confirm--without-heartbeatappears in worker startup logFIDES__CELERY__WORKER_CONCURRENCY=4and confirm worker starts with--concurrency=4nox -s "pytest(ops-unit)" -- tests/ctl/api/test_worker.pyto verify unit tests passPre-Merge Checklist
CHANGELOG.mdupdated