Deadline Alerts: Add dynamic interval resolution support via Variables#64751
Deadline Alerts: Add dynamic interval resolution support via Variables#64751SameerMesiah97 wants to merge 1 commit intoapache:mainfrom
Conversation
08de7dd to
7a6c579
Compare
|
This is in draft but a few thoughts on this:
@ferruzzi has recommended in the original issue to handle this when the deadline is calculated. |
| # Resolve dynamic interval providers (e.g. VariableInterval) | ||
| resolve = getattr(interval, "resolve", None) | ||
| if callable(resolve): | ||
| interval = resolve() | ||
|
|
There was a problem hiding this comment.
getattr(interval, "resolve", None) will call .resolve() on any object that has it, not just VariableInterval. For example, if someone accidentally passes an object with an unrelated .resolve() that returns a string, the encoder calls it and then crashes later with a confusing error. We can use a Protocol so the contract is explicit and caught at lint time.
# With Protocol — explicit contract
from typing import Protocol
class ResolvableInterval(Protocol):
def resolve(self) -> timedelta: ...
# Now you can use it in type annotations:
def __init__(self, interval: timedelta | ResolvableInterval): ...
7a6c579 to
1acd5ea
Compare
|
@seanghaeli may want to have an eye on this one as well. |
This is currently not reviewable. Please give me a bit more time to smoothen out a few rough edges. |
a700235 to
14997b7
Compare
…pporting interval objects (e.g. VariableInterval) that resolve at evaluation time. Migrate deadline_alert.interval from Float to JSON to support serialized interval objects. Ensure intervals are materialized to timedelta at runtime. Add unit tests for interval resolution and DAGRun tests validating deadline behavior and stability across updates.
14997b7 to
598ede2
Compare
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds support for dynamically configured DeadlineAlert intervals via an interval object (VariableInterval) resolved at deadline evaluation time, and migrates persisted alert intervals from Float to JSON so serialized interval objects can be stored.
Changes:
- Introduces
VariableInterval(backed by Airflow Variables) and allowsDeadlineAlert.intervalto accept resolvable intervals. - Updates deadline alert serialization/deserialization to store intervals as serialized objects (with backward compatibility for numeric seconds).
- Migrates
deadline_alert.intervalfromFloattoJSON, and updates docs/tests accordingly.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| task-sdk/src/airflow/sdk/definitions/deadline.py | Adds _ResolvableInterval protocol and VariableInterval implementation; broadens accepted interval types. |
| task-sdk/tests/task_sdk/definitions/test_deadline.py | Adds unit tests for VariableInterval.resolve() valid/invalid cases. |
| airflow-core/src/airflow/serialization/encoders.py | Serializes DeadlineAlert.interval via generic serialize() rather than total_seconds(). |
| airflow-core/src/airflow/serialization/decoders.py | Adds backward-compatible decoding for numeric intervals and deserializes interval objects. |
| airflow-core/src/airflow/serialization/definitions/dag.py | Resolves VariableInterval during DagRun deadline evaluation before using interval. |
| airflow-core/src/airflow/models/deadline_alert.py | Changes ORM column type to JSON and adjusts __repr__ to handle non-float interval shapes. |
| airflow-core/src/airflow/migrations/versions/0111_3_3_0_change_deadline_interval_to_json.py | Adds migration converting deadline_alert.interval from Float to JSON, with downgrade logic. |
| airflow-core/src/airflow/utils/db.py | Updates the 3.3.0 migration head revision mapping. |
| airflow-core/tests/unit/models/test_serialized_dag.py | Updates assertion for newly JSON-serialized interval representation. |
| airflow-core/tests/unit/models/test_dag.py | Updates interval extraction from stored alerts to handle JSON representation. |
| airflow-core/tests/unit/models/test_dagrun.py | Adds DagRun tests for VariableInterval stability and missing-variable failure; parameterizes an existing success test. |
| airflow-core/docs/howto/deadline-alerts.rst | Documents that interval can be a timedelta or dynamic interval object (e.g., VariableInterval). |
| airflow-core/docs/migrations-ref.rst | Adds the new migration to the migrations reference table. |
| airflow-core/newsfragments/64751.feature.rst | Announces dynamic interval support for deadline alerts. |
| return decorator | ||
|
|
||
|
|
||
| @attrs.define |
There was a problem hiding this comment.
DeadlineAlert.__hash__ hashes self.interval, but @attrs.define defaults can make instances unhashable when eq=True (common default), which will raise TypeError: unhashable type: 'VariableInterval' when a DeadlineAlert is hashed. Make VariableInterval explicitly hashable (e.g., @attrs.define(frozen=True) or an explicit __hash__) or update DeadlineAlert.__hash__ to avoid hashing non-hashable intervals.
| @attrs.define | |
| @attrs.define(unsafe_hash=True) |
| """ | ||
|
|
||
| key: str | ||
|
|
There was a problem hiding this comment.
DeadlineAlert.__hash__ hashes self.interval, but @attrs.define defaults can make instances unhashable when eq=True (common default), which will raise TypeError: unhashable type: 'VariableInterval' when a DeadlineAlert is hashed. Make VariableInterval explicitly hashable (e.g., @attrs.define(frozen=True) or an explicit __hash__) or update DeadlineAlert.__hash__ to avoid hashing non-hashable intervals.
| def __hash__(self) -> int: | |
| return hash(self.key) |
| if isinstance(self.interval, (int, float)): | ||
| interval_seconds = int(self.interval) | ||
|
|
||
| elif isinstance(self.interval, datetime.timedelta): | ||
| interval_seconds = int(self.interval.total_seconds()) | ||
|
|
||
| else: | ||
| interval_display = "dynamic" | ||
|
|
||
| if interval_seconds >= 3600: | ||
| interval_display = f"{interval_seconds // 3600}h" |
There was a problem hiding this comment.
interval_seconds is not set in the final else branch, but is used unconditionally afterward (if interval_seconds >= 3600). For non-numeric / non-timedelta intervals (e.g., serialized dicts or dynamic interval objects), __repr__ will raise UnboundLocalError. Restructure __repr__ to either return/format immediately for dynamic intervals or initialize interval_seconds to a sentinel and branch safely.
| if dialect == "postgresql": | ||
| op.execute(""" | ||
| UPDATE deadline_alert | ||
| SET interval = | ||
| CASE | ||
| WHEN interval::jsonb ? '__data__' | ||
| THEN to_json((interval->>'__data__')::double precision) | ||
| ELSE to_json((interval::text)::double precision) | ||
| END | ||
| """) |
There was a problem hiding this comment.
The PostgreSQL downgrade path will fail for serialized non-timedelta JSON objects (e.g., VariableInterval) because the ELSE to_json((interval::text)::double precision) branch attempts to cast a JSON object’s text (like {\"__classname__\": ...}) to double precision, which will error and abort the migration. Similarly, postgresql_using=\"(interval->>'__data__')::double precision\" will fail when __data__ is absent. Update the downgrade to only convert rows that are known numeric or known datetime.timedelta serialized shapes (e.g., check interval->>'__classname__' = 'datetime.timedelta' or jsonb_typeof(interval::jsonb) = 'number') and decide an explicit fallback for unsupported dynamic objects (e.g., set to NULL / a safe default / raise with a targeted message).
| with op.batch_alter_table("deadline_alert") as batch_op: | ||
| if dialect == "postgresql": | ||
| batch_op.alter_column( | ||
| "interval", | ||
| existing_type=sa.JSON(), | ||
| type_=sa.FLOAT(), | ||
| postgresql_using="(interval->>'__data__')::double precision", | ||
| existing_nullable=False, | ||
| ) |
There was a problem hiding this comment.
The PostgreSQL downgrade path will fail for serialized non-timedelta JSON objects (e.g., VariableInterval) because the ELSE to_json((interval::text)::double precision) branch attempts to cast a JSON object’s text (like {\"__classname__\": ...}) to double precision, which will error and abort the migration. Similarly, postgresql_using=\"(interval->>'__data__')::double precision\" will fail when __data__ is absent. Update the downgrade to only convert rows that are known numeric or known datetime.timedelta serialized shapes (e.g., check interval->>'__classname__' = 'datetime.timedelta' or jsonb_typeof(interval::jsonb) = 'number') and decide an explicit fallback for unsupported dynamic objects (e.g., set to NULL / a safe default / raise with a targeted message).
| raw_interval = data[DeadlineAlertFields.INTERVAL] | ||
|
|
||
| # Backward compatibility: previously interval was stored as total_seconds() (float/int). | ||
| # Handle numeric values by converting to timedelta. | ||
| if isinstance(raw_interval, (int, float)): | ||
| interval = datetime.timedelta(seconds=raw_interval) | ||
| else: | ||
| interval = cast("datetime.timedelta", deserialize(raw_interval)) |
There was a problem hiding this comment.
The cast(\"datetime.timedelta\", ...) is misleading now that deserialize(raw_interval) can return non-timedelta interval objects (e.g., VariableInterval). This doesn’t affect runtime behavior, but it hides type mismatches from linters/type-checkers and makes it easier to accidentally treat the result as always a timedelta. Prefer widening the type (e.g., timedelta | VariableInterval | ...) and removing or adjusting the cast accordingly.
| @@ -0,0 +1 @@ | |||
| Allow DeadlineAlert intervals to be dynamically resolved at DAG parse time using objects such as VariableInterval. | |||
There was a problem hiding this comment.
The newsfragment says intervals are resolved at 'DAG parse time', but the PR description and implementation resolve VariableInterval during DagRun deadline evaluation/creation. Please update the fragment text to match the actual behavior to avoid user confusion.
| if context.is_offline_mode(): | ||
| print( | ||
| """ | ||
| Manual conversion required: | ||
|
|
||
| PostgreSQL: | ||
| UPDATE deadline_alert | ||
| SET interval = json_build_object( | ||
| '__classname__', 'datetime.timedelta', | ||
| '__version__', 2, | ||
| '__data__', (interval::text)::float | ||
| ) | ||
| WHERE jsonb_typeof(interval::jsonb) = 'number'; |
There was a problem hiding this comment.
The offline-mode 'Manual conversion required' instructions appear inconsistent with the actual upgrade sequence: they do not include the required ALTER TABLE/type change step, and the PostgreSQL WHERE jsonb_typeof(interval::jsonb) predicate implies interval is already JSON (but offline mode returns before any conversion). Consider updating the offline instructions to be a complete, executable sequence (type change + transformation) and ensure predicates match the pre/post column type at each step.
Description
This change allows deadline intervals to be configured dynamically via Airflow Variables by introducing support for interval objects (e.g.
VariableInterval).VariableIntervaldefers resolution until deadline evaluation time, where it retrieves the Variable value (interpreted as minutes) and converts it into atimedelta. This ensures that intervals are always materialized totimedeltabefore being used at runtime.To support this, the
deadline_alert.intervalcolumn is migrated fromFloattoJSON, allowing serialized interval objects to be stored instead of only numeric values.Rationale
The primary motivation is to enable dynamic configuration of deadline intervals using Airflow Variables without requiring changes to DAG code. This allows operators to adjust deadline behavior externally, making the feature more flexible and easier to manage in production environments.
An explicit
VariableIntervaltype is used instead of implicitly resolving Variables to avoid silent fallbacks and hidden side effects. This makes the behavior opt-in and predictable, ensuring that DAG authors are aware when interval values are dynamically sourced from Variables.Resolving intervals at deadline evaluation time ensures that values are materialized to
timedeltajust before use, while remaining stable for the lifetime of a DAG Run. This approach avoids introducing additional execution-time mechanisms and keeps the implementation aligned with existing deadline evaluation behavior.Migrations
The migration
0111_3_3_0_change_deadline_interval_to_json.pyupdates thedeadline_alert.intervalcolumn fromFloattoJSONto support storing serialized interval objects. Existing numeric values are preserved and continue to be supported during decoding for backwards compatibility. SerializedVariableIntervalobjects are skipped during downgrade as they do not contain interval values.Tests
Added tests covering:
VariableInterval.resolve()for valid and invalid Variable valuesDAG Run behavior with
VariableInterval, including:VariableInterval.Other tests have been updated to accommodate the serialization of the
timedeltaobject where necessary.Documentation
DeadlineAlertintervals may be dynamically resolved via VariableInterval.VariableIntervalwith usage example and behavior notes.Backwards Compatibility
Existing
timedelta-based intervals continue to work unchanged, and previously stored numeric interval values remain supported during decoding to ensure backwards compatibility. Dynamic interval support is introduced as an opt-in feature viaVariableInterval, allowing users to adopt it incrementally. A database migration is required to change theintervalcolumn fromFloattoJSONto support serialized interval objects, but existing data remains compatible and continues to function without modification.Closes: #63852