fix: add SerializedDagModel.get_count() that raises on None instead of silently returning 0#67572
Open
gingeekrishna wants to merge 13 commits into
Open
Conversation
…f silently returning 0 COUNT queries on aggregate columns always return an integer — a None result from session.scalar() signals a transient DB connectivity failure, not an empty table. Replacing `or 0` with an explicit RuntimeError ensures the failure surfaces loudly rather than being silently emitted as a zero count that triggers false on-call pages. * Add SerializedDagModel.get_count() classmethod with None-guard * Emit serialized_dag.count metric in dag_processing/manager.py emit_metrics(), wrapped in try/except so a DB error logs a warning but never kills the parse loop * Add unit tests for get_count() and emit_metrics() serialized_dag.count path Fixes: apache#66796
Contributor
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR adds visibility into how many serialized DAGs exist in the metadata DB by introducing a SerializedDagModel.get_count() helper and emitting a new serialized_dag.count gauge from the DAG processing manager, along with unit tests.
Changes:
- Add
SerializedDagModel.get_count()to retrieve the total row count fromserialized_dag. - Emit a new StatsD gauge
serialized_dag.countfromemit_metrics(), swallowing failures. - Add unit tests for
get_count()andemit_metrics()behavior.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| airflow-core/src/airflow/models/serialized_dag.py | Adds get_count() classmethod with defensive None handling. |
| airflow-core/src/airflow/dag_processing/manager.py | Emits serialized_dag.count metric by querying the DB; logs on failure. |
| airflow-core/tests/unit/models/test_serialized_dag.py | Adds tests validating get_count() for empty/non-empty/None scalar results. |
| airflow-core/tests/unit/dag_processing/test_manager.py | Adds tests ensuring emit_metrics() emits the new gauge and swallows DB errors. |
- Narrow except clause from Exception to (RuntimeError, SQLAlchemyError) so unexpected programming errors still propagate - Replace Unicode em dash with ASCII hyphen in RuntimeError message - Fix test_emit_metrics_emits_serialized_dag_count: mock get_count instead of relying on dag_maker rows being visible across a new session boundary - Update pytest.raises match string to reflect the corrected error message
- Switch get_count() from scalar()+None-check to scalar_one(): COUNT(*) always returns exactly one row, so scalar_one() is the correct API and lets SQLAlchemyError surface naturally on DB failure instead of returning None - Narrow except clause to SQLAlchemyError only (RuntimeError no longer raised) - Add performance comment on the COUNT(*) round-trip in emit_metrics() - Fix test_emit_metrics_logs_and_swallows_db_error: assert log.exception is called with the expected message, and use SQLAlchemyError as the side-effect - Rename test to reflect it now validates logging, not just absence of raise - Add session.flush() in test_get_count_returns_correct_value to make the dag_maker-written rows explicitly visible before asserting the count - Replace test_get_count_raises_on_none_result with test_get_count_propagates_db_error that mocks session.execute to raise SQLAlchemyError
- Rephrase COUNT(*) comment: remove the inaccurate "index scan on the PK"
claim; note the query plan is DB-dependent and can be expensive, with
throttling noted as a straightforward future follow-up
- Use baseline+2 instead of hard-coded == 2 in test_get_count_returns_correct_value
so the assertion holds even if other fixtures leave rows in the table
- Replace dict-fold assertion with assert_any_call("serialized_dag.count", 3)
to avoid masking duplicate emissions
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
- Fix get_count() docstring: scalar_one() is justified by "exactly-one-row semantics", not by connectivity-error behavior (both scalar() and scalar_one() raise on connectivity failures) - Narrow except clause from SQLAlchemyError to OperationalError: the intent is to swallow transient connectivity failures only; schema/programming errors from SQLAlchemy should still propagate - Update tests to use OperationalError consistently with the new except scope - Add docstring note to test_get_count_returns_zero_on_empty_table explaining it relies on the autouse setup_test_cases fixture for the empty-table state
…b clear in test - Change `except OperationalError` to `except SQLAlchemyError` in emit_metrics() so all DB failures (disconnects, timeouts, etc.) are swallowed, not just OperationalError - Update test_emit_metrics_logs_and_swallows_db_error to use SQLAlchemyError side-effect to match the exception actually caught by the production code - Add explicit db.clear_db_serialized_dags() in test_get_count_returns_zero_on_empty_table to make the empty-table precondition self-contained and not rely solely on autouse fixture
Comment on lines
+1574
to
+1583
| # COUNT(*) on the serialized_dag table adds one DB round-trip per parse loop. | ||
| # This can be expensive on large deployments (query plan is DB-dependent and | ||
| # may involve a full table scan). The call is isolated so that a transient | ||
| # DB error never kills the parse loop; throttling this metric in the future | ||
| # is a straightforward follow-up if the round-trip becomes a bottleneck. | ||
| try: | ||
| with create_session() as session: | ||
| stats.gauge("serialized_dag.count", SerializedDagModel.get_count(session=session)) | ||
| except SQLAlchemyError: | ||
| log.exception("Failed to emit serialized_dag.count metric") |
…omment
- Add stats.incr("serialized_dag.count_error") in the except block so dashboards
receive an explicit failure signal instead of silently showing a stale gauge value
- Tighten the inline comment to mention the error-counter rationale
- Update test to assert the error counter is incremented on SQLAlchemyError
…ationalError orig
- Wrap the COUNT(*) query in a conf.getboolean guard
(scheduler.emit_serialized_dag_count_metric, default True) so large deployments
can opt out of the extra DB round-trip without code changes
- Patch create_session in both TestEmitMetrics tests so they are pure unit tests
with no real DB session creation
- Pass Exception("db failure") as the orig arg to OperationalError in
test_get_count_propagates_db_error to match real-world error construction
…ix OperationalError params
- Mock conf.getboolean explicitly to True in existing emit_metrics tests so they
are deterministic regardless of the runtime Airflow config
- Add test_emit_metrics_skips_serialized_dag_count_when_disabled to cover the
config=False branch: asserts get_count is not called and no gauge is emitted
- Pass {} (not None) as the params arg to OperationalError to match real SQLAlchemy
error construction and avoid brittle stringification behaviour
2 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Fixes #66796
SELECT COUNT()on an aggregate column always returns an integer —Nonefromsession.scalar()is only possible on a transient DB connectivity failure, not when the table is empty (which returns0). Usingor 0silently swallows that failure and emits a zero count, which can trigger false on-call pages ("all DAGs disappeared!") while hiding the real problem.SerializedDagModel.get_count()classmethod that raisesRuntimeErrorwhensession.scalar()returnsNone, instead of masking it withor 0serialized_dag.countgauge indag_processing/manager.py'semit_metrics()using the new method, wrapped intry/exceptso a DB error logs a warning but never kills the parse loopChanges
airflow-core/src/airflow/models/serialized_dag.py— addget_count()classmethod with None-guardairflow-core/src/airflow/dag_processing/manager.py— importSerializedDagModel, add module logger, emitserialized_dag.countmetric with error isolationTest plan
tests/unit/models/test_serialized_dag.py::TestSerializedDagModel::test_get_count_returns_zero_on_empty_tabletests/unit/models/test_serialized_dag.py::TestSerializedDagModel::test_get_count_returns_correct_valuetests/unit/models/test_serialized_dag.py::TestSerializedDagModel::test_get_count_raises_on_none_resulttests/unit/dag_processing/test_manager.py::TestEmitMetrics::test_emit_metrics_emits_serialized_dag_counttests/unit/dag_processing/test_manager.py::TestEmitMetrics::test_emit_metrics_does_not_raise_on_db_error