Fix multiple_outputs no-op on deferrable KubernetesPodOperator#67226
Fix multiple_outputs no-op on deferrable KubernetesPodOperator#67226paultmathew wants to merge 2 commits into
Conversation
bc4bde8 to
70cfe3a
Compare
KubernetesPodOperator(do_xcom_push=True, multiple_outputs=True, deferrable=True) silently failed to fan out the sidecar's return.json dict into per-key XComs — only `return_value` was published. Downstream tasks subscripting a key (operator.output["foo"] resolving to xcom_pull(key="foo")) got None at runtime with no error. Root cause: trigger_reentry pushed return_value manually inside a finally block and never returned the value to the task runner, so the runner's _push_xcom_if_needed (the code that honors multiple_outputs and fans the dict out) was bypassed. The sync execute_sync path already returns the result for the runner to handle (pod.py:760); this aligns trigger_reentry with that same contract. The failure-path manual push is preserved by moving it inside the event["status"] != "success" branch above the raise — partial sidecar output is still surfaced in XCom when the pod fails, and the behavior is now strictly better: the push happens even when the subsequent _clean call raises (previously the in-finally push was unreachable in that case). Fixes apache#67224 Co-authored-by: Cursor <cursoragent@cursor.com>
70cfe3a to
b3c1666
Compare
Co-authored-by: Cursor <cursoragent@cursor.com>
jscheffl
left a comment
There was a problem hiding this comment.
Thanks for the improvement. So it improves on one end but actually then with the fix leaves the two code path's in an un-fixed state. When either sync or sync execution fails and xcom is pushed it still skips handling multiple outputs.
Can you make it completely consistent?
| # Push manually before the raise — matches the sync-path | ||
| # failure-push in cleanup ("Ensure that existing XCom is | ||
| # pushed even in case of failure"). | ||
| if self.do_xcom_push and xcom_sidecar_output: | ||
| context["ti"].xcom_push(XCOM_RETURN_KEY, xcom_sidecar_output) |
There was a problem hiding this comment.
Thanks for adding for consistency. But now you are repeating the same "bug" like it was before and as you're trying to fix? In the case of failure in async it is the same (bad) behavior like in the sync call that multiple outputs is not handled.
Can you add a utility that is in both ends consistently used that minics the task runner handling for multiple outputs or use a task runner public method for this?
| # On the success path, ``trigger_reentry`` returns the sidecar output and | ||
| # leaves the XCom push to the task runner's ``_push_xcom_if_needed`` — | ||
| # see #67224. The operator no longer pushes ``return_value`` manually here. |
There was a problem hiding this comment.
I think no comment needed on bugifx
| # On the success path, ``trigger_reentry`` returns the sidecar output and | |
| # leaves the XCom push to the task runner's ``_push_xcom_if_needed`` — | |
| # see #67224. The operator no longer pushes ``return_value`` manually here. |
| # Return on success so the task runner's _push_xcom_if_needed handles | ||
| # return_value and multiple_outputs fan-out, matching execute_sync. |
There was a problem hiding this comment.
I think comment not needed as it represents the default.
| # Return on success so the task runner's _push_xcom_if_needed handles | |
| # return_value and multiple_outputs fan-out, matching execute_sync. |
There was a problem hiding this comment.
Pull request overview
Fixes deferrable KubernetesPodOperator to return the XCom sidecar payload on success so the task runner can apply _push_xcom_if_needed (including multiple_outputs fan-out), aligning behavior with the sync execution path.
Changes:
- Update
trigger_reentryto returnxcom_sidecar_outputon success (and stop manually pushingreturn_valuein the success path). - Preserve failure-path behavior by manually pushing
return_valuebefore raising, ensuring partial sidecar output is still captured even if cleanup errors. - Adjust and extend unit tests to reflect/lock in the new return-vs-push contract.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py |
Return sidecar output on successful deferrable re-entry so task runner can handle XCom push and multiple_outputs fan-out; keep manual failure push. |
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py |
Update existing async KPO tests for the new contract and add a test asserting trigger_reentry returns sidecar output when multiple_outputs=True. |
| """``trigger_reentry`` must return the sidecar output so the task runner | ||
| runs ``_push_xcom_if_needed``, which honors ``multiple_outputs`` by fanning | ||
| out the returned dict into per-key XComs. Before #67224 the operator pushed | ||
| ``return_value`` manually inside the ``finally`` block and returned | ||
| ``None``, which silently bypassed the runner's fan-out — making | ||
| ``multiple_outputs=True`` a no-op on deferrable KPO. The sync path's | ||
| ``execute_sync`` already returns ``result`` (line 760 in pod.py); this | ||
| test pins the deferrable path to the same contract. | ||
| """ |
| if do_xcom_push: | ||
| mock_extract_xcom.assert_called_once() | ||
| context["ti"].xcom_push.assert_called_with(XCOM_RETURN_KEY, mock_extract_xcom.return_value) | ||
| assert result is mock_extract_xcom.return_value |
| # ``_push_xcom_if_needed`` (in ``task-sdk/.../execution_time/task_runner.py``) | ||
| # then handles both ``return_value`` push and the ``multiple_outputs`` | ||
| # per-key fan-out, exercised end-to-end in ``test-sdk/`` unit tests. | ||
| assert result is sidecar_output |
Summary
KubernetesPodOperator(do_xcom_push=True, multiple_outputs=True, deferrable=True)silently failed to fan out the sidecar'sreturn.jsondict into per-key XComs — onlyreturn_valuewas published. Downstream tasks subscripting a key gotNoneat runtime with no error.Root cause:
trigger_reentrypushedreturn_valuemanually inside afinallyblock and never returned the value to the task runner, so the runner's_push_xcom_if_needed(the code that honorsmultiple_outputsand fans the dict out) was bypassed.The sync path's
execute_syncalready returnsresult(pod.py:760). This aligns the deferrable path with the same contract.Behaviour change
trigger_reentryreturnsxcom_sidecar_output. The task runner pushesreturn_valueand, whenmultiple_outputs=True, also fans the dict out into per-key XComs.xcom_push(XCOM_RETURN_KEY, ...)moves into theevent["status"] != "success"branch, above theraise. Partial sidecar output is still surfaced in XCom, and the push now happens even when_cleansubsequently raises (strict improvement — previously the in-finallypush was unreachable in that case).multiple_outputsfan-out is deliberately not applied on the failure path: downstream tasks that would consume per-key XComs are inUPSTREAM_FAILEDanyway. The single-keyreturn_valuepush on failure mirrors the sync path's failure-time push atpod.py:1198("Ensure that existing XCom is pushed even in case of failure").History
The deferred-path manual push was introduced by #58488 and refined by #58998. #58488 was titled "Deferred KubernetesPodOperator pushes XCom on successful execution" — the intent was success-path only. #58998 then removed a
returnfrom thefinallyblock (returning fromfinallysilently swallows exceptions) and replaced it withxcom_push, preserving the symptom on both paths. The success-path fan-out was lost in that translation; this PR restores it.Closes
Fixes #67224
Tests
Updated
test_async_kpo_wait_termination_before_cleanup_on_successandtest_async_kpo_wait_termination_before_cleanup_on_failureto reflect the new contract.Added
test_async_trigger_reentry_returns_sidecar_output_for_multiple_outputsto lock in the success-path return value withmultiple_outputs=True.The end-to-end
_push_xcom_if_neededfan-out behavior is already covered by task-SDK unit tests.