Fix KubernetesJobTrigger hang for parallelism > completions case (#64867)#65058
Fix KubernetesJobTrigger hang for parallelism > completions case (#64867)#65058holmuk wants to merge 1 commit intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR fixes a deferrable KubernetesJobOperator / KubernetesJobTrigger hang when Kubernetes parallelism > completions by making trigger completion primarily driven by the Job’s terminal state (Complete/Failed) rather than waiting for every pod from an initial “snapshot” to reach a terminal state. It also adds regression tests to cover the reported scenario (#64867).
Changes:
- Reworks
KubernetesJobTrigger.run()to wait for Job completion concurrently and collect XCom from pods on a best-effort basis (skipping missing/deleted pods). - Adds regression tests to ensure the trigger doesn’t hang when some snapshot pods never complete or are deleted.
- Adds an operator regression test verifying
execute_completetolerates partial XCom results.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py |
Changes trigger control flow to be job-first and makes XCom extraction best-effort without blocking task finalization. |
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_job.py |
Adds async regression tests for parallelism > completions pod snapshot edge cases and updates job polling assertion. |
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py |
Adds regression test ensuring execute_complete handles partial XCom payload lists. |
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py
Outdated
Show resolved
Hide resolved
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py
Show resolved
Hide resolved
475be15 to
55ab20b
Compare
jscheffl
left a comment
There was a problem hiding this comment.
Looks good to me but I am not really an expert with K8s Jobs, so I have a hard time judging details of the fix. Looking for a second maintainer review.
Nataneljpwd
left a comment
There was a problem hiding this comment.
Seems a little too complex, some improvements can be made to simplify it
| if not job_task.done(): | ||
| job_task.cancel() | ||
| with suppress(asyncio.CancelledError): | ||
| await job_task |
There was a problem hiding this comment.
This looks a little weird, we retry in the finally no matter what, even if the job_task threw an exception, or hasn't finished yet, we retry, but if an api request has been sent, you cannot cancel it, and so I see a case where the request was sent, job was created, task was cancelled and then you retry creating the job, either failing because of a unique name constraint or running the job twice
This generally looks weird to retry in the finally block, I would suggest either to hand ethe exception as intended or put the try except only on the collect XCOM
There was a problem hiding this comment.
job_task doesn't create a new job in k8s, it only checks for status of an existing job. In finally, if the job_task is still running, we cancel the task to avoid leaving a background coroutine alive. The second await job_task is only waiting for the cancellation to complete.
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py
Outdated
Show resolved
Hide resolved
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py
Outdated
Show resolved
Hide resolved
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py
Outdated
Show resolved
Hide resolved
|
|
||
| if wait_task in done: | ||
| try: | ||
| await wait_task |
There was a problem hiding this comment.
Why do we await a task which is done?
There was a problem hiding this comment.
done() means that the task is finished, but it doesn't raise exceptions or return results. We await for the task to get the final result or exception.
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py
Show resolved
Hide resolved
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py
Outdated
Show resolved
Hide resolved
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py
Outdated
Show resolved
Hide resolved
| async with semaphore: | ||
| try: | ||
| return await asyncio.wait_for( | ||
| self._extract_xcom_for_pod_best_effort(pod_name=pod_name), | ||
| timeout=per_pod_timeout, | ||
| ) | ||
| except asyncio.TimeoutError: | ||
| self.log.warning( | ||
| "Timed out extracting XCom from pod '%s' after job completion; skipping.", | ||
| pod_name, | ||
| ) | ||
| return PodXComAttempt(pod_name=pod_name, outcome="timeout") |
There was a problem hiding this comment.
Why is a semaphore used here?
There was a problem hiding this comment.
To limit the number of concurrent XCom extraction tasks. Otherwise we can put a burden on the API if the number of pods is high.
| job_task=job_task, | ||
| ) | ||
| if completion_outcome == "job_done": | ||
| post_job_pod_names = self.pod_names[pod_index:] |
There was a problem hiding this comment.
Just use the pod_name variable
There was a problem hiding this comment.
pod_name is the pod where we got job_done status, and we are going to extract XCom from this pod and all the pods that are following the pod_name named pod. That's why we can't use pod_name alone.
55ab20b to
b8fb3ea
Compare
Closes #64867
Was generative AI tooling used to co-author this PR?
Cursor
This PR resolves the hanging
Runningstate issue inKubernetesJobOperator/KubernetesJobTriggerfordeferrable=True/do_xcom_push=True.Problem description
The trigger waits for container completion for every pod name from a precomputed snapshot (pod_names) before checking the final Job status. That snapshot is built from pod discovery tied to parallelism, not to actual successful completions.
Example (
parallelism=2,completions=1):Complete(completions=1reached)KubernetesJobTriggerkeeps waiting on the second pod and does not reach Job-status evaluation, so the task can remain Running/Deferred forever.Proposed fix: Task completion should be driven by Job terminal status (
Complete/Failed), which already reflectscompletions:What does this PR do?
Updates logic for
KubernetesJobTrigger:Regresssion tests for #64867
Trigger regression tests (triggers/test_job.py)
test_run_completes_when_job_is_done_even_if_some_snapshot_pods_never_complete: Verify the trigger does not hang when a pod from the initial snapshot never reaches terminal state after the Job is already complete.test_run_skips_deleted_snapshot_pod_and_completes_when_job_is_done: verifies the trigger handles stale snapshot pods gracefully by skipping 404 Not Found pods and still finishing successfully with available XCom results.test_run_collects_later_pod_xcom_best_effort_after_job_done: verifies post-completion best-effort behavior: once the Job is already complete, the trigger continues processing remaining snapshot pods, skips per-pod extraction failures, and still returns XCom from pods that can be read.Operator regression test (operators/test_job.py)
test_execute_complete_supports_partial_xcom_results: Verifyexecute_completecorrectly handles partialxcom_resultpayloads (fewer XCom entries than initial pod snapshot), which is expected inparallelism > completionsscenarios.Additional tests for new code
test_wait_until_container_state_or_job_done_does_not_restart_wait_task: Copilot pointed out that the naive implementation of the waiting loop may not work as expected on slow clusters because of constant coroutine retrying. The test validates that we don't recreatewait_methodon every tick on a slow cluster.Behavior change
xcom_resultmay be partial (fewer entries than initialpod_names) and this is expected.404) do not fail task completion.Risks
poll_intervalvalues, the new bounded wait loop may generate extra timeout/cancel/retry iterations while waiting for pod container states. This does not fail the task by itself (it is expected retry behavior), but it can increase polling overhead and log noise until the Job reaches a terminal state.succeeded,skipped_missing,timed_out,failed_other).{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.