Under which category would you file this issue?
Providers
Apache Airflow version
3.2.1
What happened and how to reproduce it?
When using KubernetesJobOperator with do_xcom_push=True, the task returns the content of JSON file found at /airflow/xcom/return.json in k8s pod as a return_value XCom, but the data type of the returned XCom value varies depending on whether the deferrable paramter was set to True or False:
- When
deferrable=True parameter is supplied, KubernetesJobOperator returns XCom value with data type of dict
- When
deferrable=False parameter is supplied, KubernetesJobOperator returns XCom value with data type of list[dict]
This inconsistency can cause issues in downstream XCom consumers under certain conditions - for instance, if you update your KubernetesJobOperator task from deferrable to non-deferrable, it can break the downstream task that consumes the XCom value, since the XCom value type changes.
Here is an example to reproduce (note, that you'll have to adjust the NAMESPACE and K8S_CONN_ID values):
from datetime import datetime
from typing import Any
from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow.sdk import DAG
NAMESPACE = "python-projects"
IMAGE = "busybox:1.36"
K8S_CONN_ID = "kubernetes_gke"
XCOM_PAYLOAD = '{"hello": "world", "value": 42}'
PRODUCE_XCOM_CMD = [
"/bin/sh",
"-c",
f"echo '{XCOM_PAYLOAD}' > /airflow/xcom/return.json",
]
def echo_xcom(xcom_value: Any) -> None:
"""Pull and log the XCom produced by the upstream KubernetesJobOperator."""
print(f"XCom: {xcom_value}, type: {type(xcom_value)}")
with DAG(
dag_id="xcom_deferrable_repro",
start_date=datetime(2025, 1, 1),
schedule=None,
catchup=False,
) as dag:
produce_sync = KubernetesJobOperator(
task_id="produce_xcom_sync",
namespace=NAMESPACE,
image=IMAGE,
cmds=PRODUCE_XCOM_CMD,
kubernetes_conn_id=K8S_CONN_ID,
do_xcom_push=True,
wait_until_job_complete=True,
deferrable=False,
get_logs=True,
)
produce_deferrable = KubernetesJobOperator(
task_id="produce_xcom_deferrable",
namespace=NAMESPACE,
image=IMAGE,
cmds=PRODUCE_XCOM_CMD,
kubernetes_conn_id=K8S_CONN_ID,
do_xcom_push=True,
wait_until_job_complete=True,
deferrable=True,
get_logs=True,
)
echo_sync = PythonOperator(
task_id="echo_xcom_sync",
python_callable=echo_xcom,
op_args=[produce_sync.output],
)
echo_deferrable = PythonOperator(
task_id="echo_xcom_deferrable",
python_callable=echo_xcom,
op_args=[produce_deferrable.output],
)
produce_sync >> echo_sync
produce_deferrable >> echo_deferrable
XCom returned by the deferrable task:
XCom returned by the non-deferrable task:
I suspect that this is caused by unwrap_single parameter that defaults to True, but it only "unwraps" the XCom value in defferable mode and is not used in non-deferrable mode:
Deferrable mode:
|
return xcom_results[0] if self.unwrap_single and len(xcom_results) == 1 else xcom_results |
Non-deferrable mode:
What you think should happen instead?
I think the type of return_value XCom value returned by KubernetesJobOperator should be consistent and should not change depending on whether the task is executed as deferrable or not.
Operating System
Debian GNU/Linux 12 (bookworm)
Deployment
Other Docker-based deployment
Apache Airflow Provider(s)
cncf-kubernetes
Versions of Apache Airflow Providers
apache-airflow-providers-cncf-kubernetes 10.12.4
Official Helm Chart version
Not Applicable
Kubernetes Version
Not Applicable
Helm Chart configuration
No response
Docker Image customizations
No response
Anything else?
No response
Are you willing to submit PR?
Code of Conduct
Under which category would you file this issue?
Providers
Apache Airflow version
3.2.1
What happened and how to reproduce it?
When using
KubernetesJobOperatorwithdo_xcom_push=True, the task returns the content of JSON file found at/airflow/xcom/return.jsonin k8s pod as areturn_valueXCom, but the data type of the returned XCom value varies depending on whether thedeferrableparamter was set toTrueorFalse:deferrable=Trueparameter is supplied,KubernetesJobOperatorreturns XCom value with data type ofdictdeferrable=Falseparameter is supplied,KubernetesJobOperatorreturns XCom value with data type oflist[dict]This inconsistency can cause issues in downstream XCom consumers under certain conditions - for instance, if you update your
KubernetesJobOperatortask from deferrable to non-deferrable, it can break the downstream task that consumes the XCom value, since the XCom value type changes.Here is an example to reproduce (note, that you'll have to adjust the
NAMESPACEandK8S_CONN_IDvalues):XCom returned by the deferrable task:
XCom returned by the non-deferrable task:
I suspect that this is caused by
unwrap_singleparameter that defaults toTrue, but it only "unwraps" the XCom value in defferable mode and is not used in non-deferrable mode:Deferrable mode:
airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
Line 312 in 88bbf59
Non-deferrable mode:
airflow/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
Line 258 in 88bbf59
What you think should happen instead?
I think the type of
return_valueXCom value returned byKubernetesJobOperatorshould be consistent and should not change depending on whether the task is executed as deferrable or not.Operating System
Debian GNU/Linux 12 (bookworm)
Deployment
Other Docker-based deployment
Apache Airflow Provider(s)
cncf-kubernetes
Versions of Apache Airflow Providers
apache-airflow-providers-cncf-kubernetes 10.12.4
Official Helm Chart version
Not Applicable
Kubernetes Version
Not Applicable
Helm Chart configuration
No response
Docker Image customizations
No response
Anything else?
No response
Are you willing to submit PR?
Code of Conduct