Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KubernetesPodOperator does not return XCOM on pod failure #8792

Open
jvstein opened this issue May 8, 2020 · 14 comments · May be fixed by #37079
Open

KubernetesPodOperator does not return XCOM on pod failure #8792

jvstein opened this issue May 8, 2020 · 14 comments · May be fixed by #37079
Labels
kind:bug This is a clearly a bug priority:medium Bug that should be fixed before next release but would not block a release provider:cncf-kubernetes Kubernetes provider related issues

Comments

@jvstein
Copy link

jvstein commented May 8, 2020

Apache Airflow version: 1.10.9

Kubernetes version (if you are using kubernetes) (use kubectl version): 1.14.9

Environment:

  • Cloud provider or hardware configuration: AWS EKS
  • OS (e.g. from /etc/os-release): Linux (debian 9.12 inside docker image)
  • Kernel (e.g. uname -a): 5.4.0 (on my host)
  • Install tools:
  • Others:

What happened:

I ran a new task using the KubernetesPodOperator on our k8s cluster. This pod is designed to write to the /airflow/xcom/return.json even in case of failures so we can send a user-friendly error message in a following task. The pod exits with a non-zero exit code, so Airflow appropriately updates the task as failed, but the XCOM values are not available.

What you expected to happen:

I expected XCOM variables to be available even on pod failure. We use this capability in other operators to signal error conditions and messages.

How to reproduce it:

Run a KubernetesPodOperator with a command like this in an alpine image.

/bin/bash -c 'echo "{'success': False}" > /airflow/xcom/return.json; exit 1'

Check the XCOM results, which should include the JSON dictionary.

Anything else we need to know:

@jvstein jvstein added the kind:bug This is a clearly a bug label May 8, 2020
@boring-cyborg
Copy link

boring-cyborg bot commented May 8, 2020

Thanks for opening your first issue here! Be sure to follow the issue template!

@mik-laj mik-laj added the k8s label Aug 10, 2020
@dimberman dimberman added the priority:medium Bug that should be fixed before next release but would not block a release label Sep 15, 2020
@Shivarp1
Copy link

@jvstein Have you tried it in Airflow 1.10.12 version? ( k8s server version v1.17.4; k8s client version v1.15.3)
I am passing to KubernetesPodOperator
cmds=["/bin/bash", "-cx"],
arguments=['echo '{"success": True}' > /airflow/xcom/return.json; echo 0'],

the task returns success..and starts the dependent followup task however it still gets no value from /airflow/xcom/return.json
----- bash task log from the spawned k8spod
{{pod_launcher.py:173}} INFO - Event: secondary-bash-62564f5389134383924eae918edb8ef6 had an event of type Succeeded
{{pod_launcher.py:287}} INFO - Event with job id secondary-bash-62564f5389134383924eae918edb8ef6 Succeeded
{{pod_launcher.py:156}} INFO - b'+ echo '{"success": True}'\n'
{{pod_launcher.py:156}} INFO - b'/bin/bash: /airflow/xcom/return.json: No such file or directory\n'
{{pod_launcher.py:156}} INFO - b'+ echo 0\n'
{{pod_launcher.py:156}} INFO - b'0\n'
-- followup task code..
retVal = task_instance.xcom_pull(task_ids='Secondary-bash-task')
print("Printing Secondary Task Return value. ")
print(retVal)

--- followup task log..
{{logging_mixin.py:112}} INFO - Printing Secondary Task Return value.
{{logging_mixin.py:112}} INFO - None
{{python_operator.py:114}} INFO - Done. Returned value was: True

@jvstein
Copy link
Author

jvstein commented Sep 22, 2020

@Shivarp1 - I have not tested in Airflow 1.10.12. Reading through the relevant section on the 1.10.12 tag, I suspect the same issue exists.

I just noticed that my repro steps had a bug in the command. It should have been exit 1 not echo 1 at the end. I updated the description.

We're currently using 1.10.9, with the following patch.

diff --git c/airflow/contrib/operators/kubernetes_pod_operator.py i/airflow/contrib/operators/kubernetes_pod_operator.py
index f692599d7..f4b970d7e 100644
--- c/airflow/contrib/operators/kubernetes_pod_operator.py
+++ i/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -20,6 +20,7 @@ import warnings

 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
+from airflow.models import XCOM_RETURN_KEY
 from airflow.utils.decorators import apply_defaults
 from airflow.contrib.kubernetes import kube_client, pod_generator, pod_launcher
 from airflow.contrib.kubernetes.pod import Resources
@@ -253,12 +254,13 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
                 if self.is_delete_operator_pod:
                     launcher.delete_pod(pod)

+            if self.do_xcom_push:
+                self.xcom_push(context, XCOM_RETURN_KEY, result)
+
             if final_state != State.SUCCESS:
                 raise AirflowException(
                     'Pod returned a failure: {state}'.format(state=final_state)
                 )
-            if self.do_xcom_push:
-                return result
         except AirflowException as ex:
             raise AirflowException('Pod Launching failed: {error}'.format(error=ex))

@Shivarp1
Copy link

@jvstein one question..
How can we create /airflow/xcom folder when the user for the new worker pod spawned does not have the root access?

when I try this ..
cmds=["RUN mkdir -p /airflow", "RUN chmod -R 777 /airflow", "/bin/bash", "-cx"],
arguments=['mkdir /airflow/xcom','echo '{"success": True}' > /airflow/xcom/return.json', 'exit 0'],
..
I get this error..
{{pod_launcher.py:156}} INFO - b'container_linux.go:235: starting container process caused "exec: \"RUN mkdir -p /airflow\": stat RUN mkdir -p /airflow: no such file or directory"\n'

Thanks

@jvstein
Copy link
Author

jvstein commented Sep 25, 2020

@Shivarp1 - You're not allowed to pass RUN statements into the operator. You need to start with a pre-built image where those permissions are present, or just run a pod as root.

Try this task definition:

task = KubernetesPodOperator(
    dag=dag,
    task_id="xcom_test",
    name="test_xcom_failure",
    namespace="default",
    in_cluster=False,
    config_file="/path/to/kube/config",
    is_delete_operator_pod=True,
    image_pull_policy="IfNotPresent",
    image="alpine:3.12",
    do_xcom_push=True,
    retries=0,
    cmds=["/bin/sh", "-c"],
    arguments=["mkdir -p /airflow/xcom; echo '{\"success\": false}' | tee /airflow/xcom/return.json; exit 1"],
)

@kaxil kaxil added provider:cncf-kubernetes Kubernetes provider related issues and removed area:k8s labels Nov 18, 2020
@dstandish
Copy link
Contributor

@kaxil @jedcunningham is this desired behavior? i.e. to attempt to push xcom even in the case of failure?

@SacredSkull
Copy link

The above is still true as of Airflow 2.2.2 - I would love to see this working!

Perhaps as an option do_xcom_push_on_failure?

As a work-around I've had to let the internal script pass and handle/check for the error in a downstream task and mark that as a failure.

@dstandish
Copy link
Contributor

Yeah I think we should go ahead and make this with no option -- just make it push xcom in a finally or something. But have to check on timing. There's a refactor of KPO in progress and may make sense to include the change as part of that.

@kaxil
Copy link
Member

kaxil commented Dec 8, 2021

Plus one to what Daniel said

@eladkal
Copy link
Contributor

eladkal commented Mar 16, 2023

Any operator can be resulted in failure. Currently as far as I can tell we don't push to xcom information about the failure.
If we want to do this we should probably make it generic (as much as we can)

Personally I would prefer this push not to be the default behavior.

@potiuk
Copy link
Member

potiuk commented Mar 17, 2023

Any operator can be resulted in failure. Currently as far as I can tell we don't push to xcom information about the failure.
If we want to do this we should probably make it generic (as much as we can). Personally I would prefer this push not to be the default behavior.

TL;DR; I would be on having the push_on_failure as an option - but only for the few "generic" operators we have - KPO/Docker/Bash etc.

I thought a bit on that and this is very much philosophical issue :). Whie I have no "only one good" solution, intuitively I think pushing xcom on failure should be added as option for "generic" operators - like KPO/Docker/Bash/Python, but we shuold not do it for "specific" operator.

Let me explain my line of thoughts - maybe that will lead

Generally speaking default behaviour for "regular" operator is that they push to xcom whatever is returned by execute() (unless do_xcom_push is false). That's the current semantics. And when there is a failure we CANNOT push anything because the execute () method does not return anything, unless we change this semantics.

Also doing so is a bit superfluous if you consider that the author of the operator might choose to do it on their own:

def execute():
    try:
       ....
     except Exception():
          Xcom.push(...)
          raise

And they will be able to put a message if they really want to push something on failure. And it's up to the author of the operator to define the behaviour. As a user, when you get an operator that does specific thing, it is generally "closed" - it does what it tells you, you have not much freedom there as a user, the author already made some decisions for you. Of course as a user you can extend such operator and then you can change the behaviour and add similar try/except wrapper.

The thing with KPO (and few other generic operators like Docker, Bash, Python) is that this is generic operator - and as a user you have more freedom to decide what and where happens - by providing a bash script, image, pod_template etc. - and this is where you also might get to decide what to do in case of the failure. But ... you cannot really (as a user) currently make a decision whether to push it or not now (without extending the operator) - so suddenly the "generic" operators are not as generic any more. Yes you can extend it -but "generic" operator's philsophy is that they shoud not need to be extended, whereas for the "specific" operators, "extension" is the only way of changing the behaoviour that hte author of the operator made decisions on.

I am not super strong on it, but wanted to explain not only what I think but also what led me to thinking this is the best approach. Maybe this will be good for others to think that through.

@hussein-awala
Copy link
Member

I'm interested in this proposed feature (btw it is not a bug). I tried to create something generic as @potiuk suggested, but it's too complicated since we don't return any result on failure, the only way to do that is through the exception, which is not a clean way, and implementing it in Airflow core will complicate the support in the providers.

Since it will only be supported by a small set of operators, IMHO we have to handle it on a case-by-case basis. I created #37079 to implement it in KPO and tested it in normal and deferrable mode. I can add some tests to make it ready to merge if you agree to the proposal.

@hussein-awala
Copy link
Member

As a workaround, we can use the new callbacks class:

class XComCallbacks(KubernetesPodOperatorCallback):
    @staticmethod
    def on_pod_completion(*, pod: k8s.V1Pod, client: CoreV1Api, mode: str, **kwargs) -> None:
        from airflow.models.xcom import XCom

        def _construct_run_id(run_id: str):
            """re-construct the run_id from the safe label"""
            new_run_id = run_id.split("T")[0]
            rest = run_id.split("T")[1]
            new_run_id += "T"
            new_run_id += rest[:2] + ":" + rest[2:4] + ":" + rest[4:6]
            new_run_id += rest[6:13] + "+" + rest[13:15] + ":" + rest[15:17]
            return new_run_id

        if (pod.status.phase if hasattr(pod, "status") else None) != PodPhase.SUCCEEDED:
            pod_manager = PodManager(kube_client=client)
            pod_manager.await_xcom_sidecar_container_start(pod=pod)
            result = pod_manager.extract_xcom(pod=pod)
            XCom.set(
                key="failure_result",
                value=result,
                task_id=pod.metadata.labels["task_id"],
                dag_id=pod.metadata.labels["dag_id"],
                run_id=_construct_run_id(pod.metadata.labels["run_id"]),
            ) 

And for deferrable mode, we can implement the same logic in on_operator_resuming.

Happy to find a new use case for this feature 😄

@marlena-hammond
Copy link

I am interested in this feature. For, my usecase, it is very helpful being able to retry the task/DAG based on the exception that is returned.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind:bug This is a clearly a bug priority:medium Bug that should be fixed before next release but would not block a release provider:cncf-kubernetes Kubernetes provider related issues
Projects
None yet
Development

Successfully merging a pull request may close this issue.