Databricks: Fail fast for non-serializable retry_args in deferrable operators and triggers#64960
Databricks: Fail fast for non-serializable retry_args in deferrable operators and triggers#64960kosiew wants to merge 8 commits intoapache:mainfrom
Conversation
Implement a shared validation guard to reject non-serializable databricks_retry_args before deferrable Databricks tasks cross the trigger boundary. Enforce this check for deferrable operators and SQL sensor in databricks.py. Add regression tests to cover failure modes for both in test_databricks.py.
Move validation logic to retry.py for better cohesion. Enforce validation in both trigger constructors within databricks.py. Add direct trigger regression tests in test_databricks.py and update sensor test setup to maintain deferrable branch coverage.
Enhance operators, sensors, and triggers tests to cover two
unsupported Tenacity shapes. Tests are now parameterized for
{"wait": wait_incrementing(...)} and {"stop":
stop_after_attempt(...)} scenarios.
Extract shared invalid retry-arg test data and pytest.raises assertion into _retry_test_utils.py. Remove duplicated UNSUPPORTED_RETRY_ARGS definitions from operator, sensor, and trigger test files. Simplify setup in operator and sensor negative tests with local helpers for the running deferrable path. Combine two trigger-construction negative tests into one shared parametrized test in test_databricks.py.
Require owner explicitly in retry.py's private helper. Define an UNSUPPORTED_RETRY_ARGS constant in _retry_test_utils.py and update operator, sensor, and trigger tests to parametrize directly from it in test_databricks.py.
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide
|
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds fail-fast validation for Databricks deferrable retry arguments to ensure they’re serialization-safe before crossing the trigger boundary, improving debuggability when non-serializable Tenacity strategies are passed.
Changes:
- Introduces a
validate_deferrable_databricks_retry_argshelper that checks Airflow serde-serializability of retry args. - Validates
retry_argsduring Databricks trigger initialization to prevent triggerer-side serialization failures. - Adds unit tests (operators/sensors/triggers) and shared test utilities for unsupported retry args.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| providers/databricks/src/airflow/providers/databricks/utils/retry.py | Adds serde-based validation helper and standardized error message. |
| providers/databricks/src/airflow/providers/databricks/triggers/databricks.py | Calls validation in trigger constructors to fail fast. |
| providers/databricks/tests/unit/databricks/_retry_test_utils.py | Adds shared invalid retry args and assertion helper for tests. |
| providers/databricks/tests/unit/databricks/triggers/test_databricks.py | Adds parametrized tests ensuring triggers reject non-serializable retry args. |
| providers/databricks/tests/unit/databricks/operators/test_databricks.py | Adds deferrable operator test to reject unsupported retry args early. |
| providers/databricks/tests/unit/databricks/sensors/test_databricks.py | Adds deferrable sensor test to reject unsupported retry args early. |
| try: | ||
| serde_serialize(retry_args) | ||
| except (AttributeError, RecursionError, TypeError) as err: | ||
| raise ValueError( | ||
| f"{owner} does not support non-serializable databricks_retry_args when deferrable=True. " | ||
| "Use JSON-serializable values, remove callable retry strategies, or disable deferrable mode." | ||
| ) from err |
There was a problem hiding this comment.
airflow.sdk.serde.serialize can also fail with ValueError (implementation-dependent), which currently bypasses the intended “clear ValueError” message and may surface a less actionable error. Consider including ValueError in the caught exceptions (or catching a broader serde-specific base exception if available) and re-raising with the standardized message to ensure consistent fail-fast behavior.
There was a problem hiding this comment.
Thanks, good catch. I agree the helper should normalize serializer failures into the Databricks-specific message so users get the same fail-fast guidance regardless of which serde exception is raised.
| raise ValueError( | ||
| f"{owner} does not support non-serializable databricks_retry_args when deferrable=True. " | ||
| "Use JSON-serializable values, remove callable retry strategies, or disable deferrable mode." | ||
| ) from err |
There was a problem hiding this comment.
The validation is invoked for trigger constructor argument retry_args, but the error message only references databricks_retry_args, which can be confusing when failures occur in trigger init paths. Consider updating the message to mention both (retry_args / databricks_retry_args) or accepting a param_name argument so the error can accurately name the failing parameter depending on call site.
There was a problem hiding this comment.
Agreed. The same validation helper is used from both the operator-facing databricks_retry_args path and the trigger-facing retry_args path, so the message should name both to avoid confusion.
| caller: str = "DatabricksExecutionTrigger", | ||
| ) -> None: | ||
| super().__init__() | ||
| validate_deferrable_databricks_retry_args(retry_args, owner=self.__class__.__name__) |
There was a problem hiding this comment.
The trigger constructor already accepts a caller argument (likely used to report the originating component), but the new validation uses self.__class__.__name__ for owner. If caller is meant to carry more precise context (e.g., operator vs trigger), consider passing owner=caller to keep error attribution consistent with existing patterns.
| validate_deferrable_databricks_retry_args(retry_args, owner=self.__class__.__name__) | |
| validate_deferrable_databricks_retry_args(retry_args, owner=caller) |
There was a problem hiding this comment.
That makes sense. caller is already part of the trigger API and gives better attribution when the trigger is constructed on behalf of an operator or sensor.
| { | ||
| "statement_id": STATEMENT_ID, | ||
| "databricks_conn_id": DEFAULT_CONN_ID, | ||
| "end_time": time.time() + 60, |
There was a problem hiding this comment.
Using time.time() in test data makes the case slightly non-deterministic and harder to reason about, especially since end_time is not relevant to the behavior under test (retry args validation). Consider replacing it with a fixed constant (e.g., end_time=1234567890.0) to keep the test fully deterministic.
| "end_time": time.time() + 60, | |
| "end_time": 1234567890.0, |
There was a problem hiding this comment.
Agreed. The timestamp is incidental to retry validation, so a fixed constant is clearer.
| from typing import Any | ||
|
|
||
| from airflow.sdk.serde import serialize as serde_serialize | ||
|
|
||
|
|
||
| def validate_deferrable_databricks_retry_args(retry_args: dict[Any, Any] | None, *, owner: str) -> None: |
There was a problem hiding this comment.
The signature dict[Any, Any] suggests arbitrary key types are acceptable, but the function’s intent is “JSON/serde-serializable” retry configuration, which typically implies string keys (JSON object keys). Consider narrowing the type to Mapping[str, Any] | None (or dict[str, Any] | None) to better document the expected API contract and help callers catch issues earlier via typing.
| from typing import Any | |
| from airflow.sdk.serde import serialize as serde_serialize | |
| def validate_deferrable_databricks_retry_args(retry_args: dict[Any, Any] | None, *, owner: str) -> None: | |
| from typing import Any, Mapping | |
| from airflow.sdk.serde import serialize as serde_serialize | |
| def validate_deferrable_databricks_retry_args( | |
| retry_args: Mapping[str, Any] | None, *, owner: str | |
| ) -> None: |
There was a problem hiding this comment.
Agreed. A mapping type better documents that the helper only reads the retry configuration, and str keys better reflect the public retry-args contract.
Refactor retry.py to catch ValueErrors and clarify retry_args/databricks_retry_args messages. Adjust validation in databricks.py to use owner=caller. Update tests in operators, sensors, and triggers for Databricks. Fix test-helper import to follow repo style.
Replace SDK serde import with stdlib JSON serialization in retry.py. Update validation call to use json.dumps() instead of serde_serialize() to improve simplicity and reduce dependencies.
Implement tests for the retry validation function in the Databricks provider. Handle cases for `None` and valid JSON-serializable primitive retry configurations, while ensuring unsupported Tenacity retry arguments are rejected.
Summary
Validate
databricks_retry_args/retry_argsfor deferrable Databricks operators, sensors, and triggers to ensure they are serialization-safe before crossing the trigger boundary. Non-serializable values (e.g., Tenacity strategy callables) now raise a clearValueErrorat initialization/execution time.Motivation / Problem
Deferrable Databricks execution forwards retry configuration through the trigger serialization boundary. Non-serializable objects (such as
tenacity.wait_incrementing(...)ortenacity.stop_after_attempt(...)) cannot be serialized by Airflow’s serde layer and fail at runtime in the triggerer, making debugging difficult.What this PR does
Introduces
validate_deferrable_databricks_retry_argsutility to assert JSON/serde-serializability of retry args.Invokes validation in:
DatabricksExecutionTriggerDatabricksSQLStatementExecutionTriggerEnsures deferrable operator/sensor paths fail fast when invalid retry args are provided.
Adds comprehensive unit tests covering:
Behavior change
Before:
After:
Immediate
ValueErrorwith actionable message:Example of unsupported config
Example of supported config
Implementation details
providers/databricks/utils/retry.pyairflow.sdk.serde.serialize) to validate compatibilityAttributeError,RecursionError,TypeErrorand rethrows asValueErrorTests
Added shared test utilities for invalid retry args
Parametrized tests using Tenacity objects:
wait_incrementingstop_after_attemptCoverage includes:
Backward compatibility
Documentation
Checklist
Was generative AI tooling used to co-author this PR?
Codex
Github Copilot
ChatGPT