Enforce execution_timeout in deferrable KubernetesPodOperator#67229
Enforce execution_timeout in deferrable KubernetesPodOperator#67229paultmathew wants to merge 2 commits into
Conversation
4eb5810 to
fb5e3fb
Compare
5151d22 to
fb5e3fb
Compare
jscheffl
left a comment
There was a problem hiding this comment.
Thanks for the extension, looks good to me. Except some comments.
| last_log_time=last_log_time, | ||
| logging_interval=logging_interval, | ||
| trigger_kwargs=trigger_kwargs, | ||
| execution_deadline=execution_deadline, |
There was a problem hiding this comment.
This adds a coupling between the AWS and CNCF-K8s providers which are packaged into different distributions. If we keep it like this the AWS provider would gain a required dependency of the next future K8s version being available. This dependency would need to be added to pyproject.toml as # use next version
There was a problem hiding this comment.
Switched EksPodTrigger to forward base-trigger kwargs through **kwargs (commit 8dfa1af) rather than listing each parent parameter explicitly. This mirrors GKEStartPodTrigger where *args, **kwargs get forwarded to super().__init__ directly.
The explicit kwarg list was readable as documentation of the supported surface. I think **kwargs + the KubernetesPodTrigger docstring is a reasonable substitute — but if you'd rather keep the explicit list and add a # use next version marker in providers/amazon/pyproject.toml, happy to flip back. Let me know.
There was a problem hiding this comment.
Pull request overview
This PR enforces execution_timeout for KubernetesPodOperator(deferrable=True) by translating the timeout into an absolute deadline, passing it to KubernetesPodTrigger, and adding trigger-side logic to emit a terminal timeout event when the deadline is exceeded (with accompanying unit tests). It also updates the EKS-specific trigger subclass to forward the new parameter.
Changes:
- Add
execution_deadlineplumbing fromKubernetesPodOperator.invoke_defer_method()toKubernetesPodTriggerand pass atimeout=todefer()based on remaining budget. - Add trigger-side deadline enforcement that emits a
status="timeout"event once the deadline is crossed. - Extend/adjust unit tests for trigger serialization and timeout behavior, plus operator deferral plumbing.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py | Compute an absolute execution deadline from ti.start_date and execution_timeout, pass it to the trigger, and set defer(timeout=…). |
| providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py | Add execution_deadline to trigger init/serialization and emit a timeout TriggerEvent when the deadline is exceeded. |
| providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py | Add tests asserting the operator passes execution_deadline (or None) into the trigger and sets defer.timeout appropriately. |
| providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py | Update serialization expectations and add trigger run-loop tests for deadline timeout vs. continued polling. |
| providers/amazon/src/airflow/providers/amazon/aws/triggers/eks.py | Forward the new execution_deadline parameter through EksPodTrigger to the base Kubernetes trigger. |
| # ``trigger_timeout``). | ||
| remaining = execution_deadline - time.time() | ||
| defer_timeout = datetime.timedelta(seconds=max(0.0, remaining)) |
There was a problem hiding this comment.
Fixed by clamping defer_timeout to a 60-second minimum buffer:
remaining = execution_deadline - time.time()
defer_timeout = max(
datetime.timedelta(seconds=remaining),
datetime.timedelta(seconds=60),
)
Rationale for the 60s buffer (vs the alternatives you suggested):
- Don't set
timeoutwhenremaining <= 0: works, but loses the framework backstop entirely. If the trigger hangs (bug, network partition, etc.) the task stays deferred forever. - Fail immediately when
remaining <= 0: cleanest in theory, but invasive — would need to raise an exception frominvoke_defer_methodand route through cleanup. Bigger refactor than the bug warrants. - 60s minimum buffer (chosen): the trigger's first-iteration deadline check (top of
run()) fires within ~poll_intervalseconds (default 2s) and emits the operator-handledstatus="timeout"event. The 60s framework backstop only fires if the trigger is actually hung. Best of both worlds.
Added test test_invoke_defer_method_clamps_defer_timeout_to_minimum_buffer_when_deadline_close that uses time_machine.travel(ti_start + 600s) to put the deadline 300s in the past and asserts defer.timeout == timedelta(seconds=60).
Open to revisiting if you'd prefer a different minimum (or one of the alternative approaches).
fb5e3fb to
8dfa1af
Compare
|
@jscheffl Thanks for the review. I pushed a change and addressed the comments. |
30e6f9a to
448ace1
Compare
Closes: apache#67227 Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
2da1d91 to
6eec8cf
Compare
Why + What
KubernetesPodOperator(deferrable=True)does not enforceexecution_timeout. Once the operator defers, the synchronousexecute()returns and thesignal.alarm-based timeout context wrapping it exits cleanly — there is no furtherexecution_timeoutenforcement for the lifetime of the deferral. Pods continue running well pastexecution_timeout, bounded only byactive_deadline_seconds(which defaults to ~1h or whatever the operator passed).The framework gap is acknowledged by
# TODO: handle timeout in case of deferralattask-sdk/.../task_runner.py:1782.This PR fixes the symptom for
KubernetesPodOperator, mirroring the pattern already merged forAirbyteTriggerSyncOperator(PR #64051) andDbtCloudRunJobOperator(PR #66449).Approach
Operator (
pod.py): ininvoke_defer_method, translateexecution_timeoutinto an absolute deadline anchored onti.start_date:execution_deadline = ti.start_date.timestamp() + execution_timeout.total_seconds()execution_deadlinetoKubernetesPodTrigger.timeout=remaining(timedelta) toself.defer()so the framework'strigger_timeoutalso bounds the trigger lifetime as a backstop.ti.start_datekeeps the deadline stable across re-deferrals (e.g.logging_intervalre-entries), since Airflow preserves the originalstart_datewhen a task resumes from defer.contextfromtrigger_reentry→invoke_defer_methodso the deadline is recomputed correctly on each re-defer.Trigger (
pod.py): at the top of_wait_for_container_completion, checktime.time() >= execution_deadlineand emit astatus="timeout"event when the deadline is crossed. The operator's existingtrigger_reentryterminal-event path already handlesstatus in ("error", "failed", "timeout", "success")— the operator fails the task and_clean()runson_finish_action(default: delete pod).Impact
execution_timeoutwas previously a no-op for deferred KPO tasks, and remains a no-op when not set. Tasks withoutexecution_timeoutsee no behaviour change (execution_deadline=None,defer.timeout=None).execution_deadlineparameter onKubernetesPodTriggeris keyword-only with aNonedefault. Trigger serialization adds the field but defaults preserve back-compat for existing serialized triggers (the trigger's__init__accepts the kwarg as optional).on_finish_actionpath handles pod deletion (defaultdelete_pod) when the operator fails on atimeoutevent._clean()already special-casesevent["status"] == "timeout"to skipawait_pod_completion(the pod may hang onErrImagePull/ContainerCreating).Tests
tests/unit/cncf/kubernetes/triggers/test_pod.py):test_serializeto include the newexecution_deadlinekey.test_serialize_with_execution_deadline— round-trips a non-None deadline.test_run_loop_emits_timeout_event_when_execution_deadline_reached— past-deadline → first iteration emitsstatus="timeout"event.test_run_loop_does_not_emit_timeout_when_execution_deadline_not_reached— far-future deadline → trigger keeps polling normally.tests/unit/cncf/kubernetes/operators/test_pod.py):test_invoke_defer_method_passes_execution_deadline_when_execution_timeout_set— operator withexecution_timeout=300spasses a deadline ≈ti.start_date + 300sto the trigger;defer.timeoutis set.test_invoke_defer_method_passes_no_deadline_when_execution_timeout_not_set— operator withoutexecution_timeoutpassesNone(no enforcement, no behaviour change).Backwards Compatibility
No public API changes. New
execution_deadlineparameter onKubernetesPodTriggeris optional with defaultNone. Behaviour change:execution_timeout-equipped deferred KPO tasks now actually fail at the configured timeout instead of running indefinitely; this is the documented contract.Closes
Closes: #67227