Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix template_ext processing for Kubernetes Pod Operator #17186

Closed
potiuk opened this issue Jul 23, 2021 · 9 comments · Fixed by #17760
Closed

Fix template_ext processing for Kubernetes Pod Operator #17186

potiuk opened this issue Jul 23, 2021 · 9 comments · Fixed by #17760
Labels
area:core kind:bug This is a clearly a bug

Comments

@potiuk
Copy link
Member

potiuk commented Jul 23, 2021

The "template_ext" mechanism is useful for automatically loading and jinja-processing files which are specified in parameters of Operators. However this might lead to certain problems for example (from slack conversation):

The templated_fields in  KubernetesPodOperator seems cause the error airflow jinja2.exceptions.TemplateNotFound when some character in the column, e.g / in env_vars.
The code got error.
env_vars=[
            k8s.V1EnvVar(
                name="GOOGLE_APPLICATION_CREDENTIALS",
                value="/var/secrets/google/service-account.json",
            ),

I believe the behaviour changed comparing to not-so-long-past. Some of the changes with processing parameters with jinja recursively caused this template behaviour to be applied also nested parameters like above.

There were also several discussions and user confusion with this behaviour: #15942, #16922

There are two ways we could improve the situation:

  1. limit the "template_extension" resolution to only direct string kwargs passed to the operator (I think this is no brainer and we should do it)

  2. propose some "escaping" mechanism, where you could either disable template_extension processing entirely or somehow mark the parameters that should not be treated as templates.

Here I have several proposals:

a) we could add "skip_template_ext_processing" or similar parameter in BaseOperator <- I do not like it as many operators rely on this behaviour for a good reason
b) we could add "template_ext" parameter in the operator that could override the original class-level-field <- I like this one a lot
c) we could add "disable_template_ext_pattern" (str) parameter where we could specify list of regexp's where we could filter out only specific values <- this one will allow to disable template_ext much more "selectively" - only for certain parameters.

UPDATE: It only affects Kubenernetes Pod Operator due to it's recursive behaviour and should be fixed there.

@potiuk potiuk added kind:feature Feature Requests area:core labels Jul 23, 2021
@uranusjr
Copy link
Member

In my ideal world, passing a path as an argument shouldn't need special escaping, and we should require something like

BashOperator(bash_command=Tempalte('/path/to/my-script.sh'))

to trigger templating.

More realistic though, something like

MyOperator(path=Literal('/path/to/my-file.json'))

should work?

@potiuk
Copy link
Member Author

potiuk commented Jul 23, 2021

The problem is that we already have operators that heavily rely on this behaviour and will replace ANY string ending with the designated extensions with the jinja-processed value of the files they are pointing to.

This is convenient, though yeah - super-ambiguous in a number of cases, and I think we need a tactical solution for now to handle confusion it creates. I think the "recursive" behaviour is relatively new and unintended behaviour and we could address it in patchlevel release classifying it as a "bug" rather than breaking feature.

Then if we do above, I think we might even not need the "Escaping" but straight away implementing some "explicit" way of marking the template files - a variant of what you proposed @uranusjr and deprecate the usage of implicit extension handling with warning, targeting its removal for Airflow 3.

BTW - I thought Literal (https://www.python.org/dev/peps/pep-0586/) is a "type-checking" feature only, not actual class you can instantiate? Am I wrong :D ?

@uranusjr
Copy link
Member

Oh, of course we shouldn't use typing.Literal. I was just borrowing the name and should've clarified. We should implement something like from airflow import Literal.

@kaxil
Copy link
Member

kaxil commented Jul 28, 2021

Airflow shouldn't try to template the following. It's a bug -- for recursive we look for template_fields attribute (https://github.com/apache/airflow/pull/4743/files#diff-b5ec97b8301ab688b7e045426f24d485). k8s.V1EnvVar should have that attribute so it should not try to render template. Need to look at it in more detail 👀 .

env_vars=[
            k8s.V1EnvVar(
                name="GOOGLE_APPLICATION_CREDENTIALS",
                value="/var/secrets/google/service-account.json",
            ),

@potiuk
Copy link
Member Author

potiuk commented Jul 28, 2021

But we have 'env_vars' in K8SPodOperator template_fields:

So I think this behaviour is pretty expected with the current state of "recursive" behaviour.

(and I find it pretty useful to be honest)

@kaxil
Copy link
Member

kaxil commented Jul 28, 2021

I think you are confusing 2 things:

  1. Recursive classes

(
# check nested fields can be templated
ClassWithCustomAttributes(att1="{{ foo }}_1", att2="{{ foo }}_2", template_fields=["att1"]),
{"foo": "bar"},
ClassWithCustomAttributes(att1="bar_1", att2="{{ foo }}_2", template_fields=["att1"]),
),
(
# check deep nested fields can be templated
ClassWithCustomAttributes(
nested1=ClassWithCustomAttributes(
att1="{{ foo }}_1", att2="{{ foo }}_2", template_fields=["att1"]
),
nested2=ClassWithCustomAttributes(
att3="{{ foo }}_3", att4="{{ foo }}_4", template_fields=["att3"]
),
template_fields=["nested1"],
),
{"foo": "bar"},
ClassWithCustomAttributes(
nested1=ClassWithCustomAttributes(
att1="bar_1", att2="{{ foo }}_2", template_fields=["att1"]
),
nested2=ClassWithCustomAttributes(
att3="{{ foo }}_3", att4="{{ foo }}_4", template_fields=["att3"]
),
template_fields=["nested1"],
),
),
(
# check null value on nested template field
ClassWithCustomAttributes(att1=None, template_fields=["att1"]),
{},
ClassWithCustomAttributes(att1=None, template_fields=["att1"]),
),
(
# check there is no RecursionError on circular references
object1,
{"foo": "bar"},
object1,
),

  1. Iterable (list, dict, tuple, etc)

("{{ foo }}", {"foo": "bar"}, "bar"),
(["{{ foo }}_1", "{{ foo }}_2"], {"foo": "bar"}, ["bar_1", "bar_2"]),
(("{{ foo }}_1", "{{ foo }}_2"), {"foo": "bar"}, ("bar_1", "bar_2")),

Logic - Flow of code:

if isinstance(content, str):
if any(content.endswith(ext) for ext in self.template_ext):
# Content contains a filepath
return jinja_env.get_template(content).render(**context)
else:
return jinja_env.from_string(content).render(**context)
elif isinstance(content, (XComArg, DagParam)):
return content.resolve(context)
if isinstance(content, tuple):
if type(content) is not tuple:
# Special case for named tuples
return content.__class__(
*(self.render_template(element, context, jinja_env) for element in content)
)
else:
return tuple(self.render_template(element, context, jinja_env) for element in content)
elif isinstance(content, list):
return [self.render_template(element, context, jinja_env) for element in content]
elif isinstance(content, dict):
return {key: self.render_template(value, context, jinja_env) for key, value in content.items()}
elif isinstance(content, set):
return {self.render_template(element, context, jinja_env) for element in content}
else:
if seen_oids is None:
seen_oids = set()
self._render_nested_template_fields(content, context, jinja_env, seen_oids)
return content

For nested it should do:

def _render_nested_template_fields(
self, content: Any, context: Dict, jinja_env: jinja2.Environment, seen_oids: Set
) -> None:
if id(content) not in seen_oids:
seen_oids.add(id(content))
try:
nested_template_fields = content.template_fields
except AttributeError:
# content has no inner template fields
return
self._do_render_template_fields(content, nested_template_fields, context, jinja_env, seen_oids)

Since env_vars is a list, the items inside that list will be rendered if it is a string but since it is k8s.V1EnvVar class that does not have template_field attribute it should not.

The only reason it is doing that is we override that method for KPO:

def _render_nested_template_fields(
self,
content: Any,
context: Dict,
jinja_env: "jinja2.Environment",
seen_oids: set,
) -> None:
if id(content) not in seen_oids and isinstance(content, k8s.V1EnvVar):
seen_oids.add(id(content))
self._do_render_template_fields(content, ('value', 'name'), context, jinja_env, seen_oids)
return
super()._render_nested_template_fields(
content,
context,
jinja_env,
seen_oids
)

So the issue / bug is because of a fix in #14123 - and it does not affect any other operators.

@potiuk
Copy link
Member Author

potiuk commented Jul 28, 2021

Ah ok. I see now :)

@potiuk potiuk added this to the Airflow 2.1.3 milestone Aug 3, 2021
@potiuk potiuk changed the title Smarter template_ext processing Fix template_ext processing for Kubernetes Pod Operator Aug 3, 2021
@potiuk
Copy link
Member Author

potiuk commented Aug 3, 2021

I have changed the name to correctly reflect the discussion results.

@potiuk potiuk added kind:bug This is a clearly a bug and removed kind:feature Feature Requests labels Aug 3, 2021
@kaxil kaxil removed this from the Airflow 2.1.3 milestone Aug 9, 2021
kaxil added a commit to astronomer/airflow that referenced this issue Aug 21, 2021
This commit effectively revers apache#15942 because of the issues it is causing. Until we find better solution we should revert this change

Error:

```
[2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00
[2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task
[2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l']
[2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom
[2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710
[2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks
    self.render_templates(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates
    self.task.render_template_fields(context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields
    self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields
    rendered_content = self.render_template(content, context, jinja_env, seen_oids)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template
    return [self.render_template(element, context, jinja_env) for element in content]
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp>
    return [self.render_template(element, context, jinja_env) for element in content]
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template
    return jinja_env.get_template(content).render(**context)
  File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template
    return self._load_template(name, self.make_globals(globals))
  File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template
    template = self.loader.load(self, name, globals)
  File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load
    source, filename, uptodate = self.get_source(environment, name)
  File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source
    raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json
[2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354
[2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1
```

Dag:

```
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from airflow.utils.dates import days_ago
from airflow.configuration import conf

namespace = conf.get("kubernetes", "NAMESPACE")

# This will detect the default namespace locally and read the
# environment namespace when deployed to Astronomer.
if namespace == "default":
    config_file = "/usr/local/airflow/include/.kube/config"
    in_cluster = False
else:
    in_cluster = True
    config_file = None

default_args = {
    "owner": "airflow",
}

with DAG(
    dag_id="k8_pod_operator_xcom",
    default_args=default_args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=["k8"],
) as dag:

    write_xcom = KubernetesPodOperator(
        namespace=namespace,
        in_cluster=in_cluster,
        config_file=config_file,
        image="ubuntu",
        cmds=[
            "sh",
            "-c",
            "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json",
        ],
        name="write-xcom",
        do_xcom_push=True,
        is_delete_operator_pod=True,
        task_id="write-xcom",
        get_logs=True,
    )

    pod_task_xcom_result = BashOperator(
        bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
        task_id="pod_task_xcom_result",
    )

    write_xcom >> pod_task_xcom_result

```

closes apache#17186
kaxil added a commit that referenced this issue Aug 21, 2021
This commit effectively revers #15942 because of the issues it is causing. Until we find better solution we should revert this change

Error:

```
[2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00
[2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task
[2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l']
[2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom
[2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710
[2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks
    self.render_templates(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates
    self.task.render_template_fields(context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields
    self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields
    rendered_content = self.render_template(content, context, jinja_env, seen_oids)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template
    return [self.render_template(element, context, jinja_env) for element in content]
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp>
    return [self.render_template(element, context, jinja_env) for element in content]
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template
    return jinja_env.get_template(content).render(**context)
  File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template
    return self._load_template(name, self.make_globals(globals))
  File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template
    template = self.loader.load(self, name, globals)
  File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load
    source, filename, uptodate = self.get_source(environment, name)
  File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source
    raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json
[2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354
[2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1
```

Dag:

```
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from airflow.utils.dates import days_ago
from airflow.configuration import conf

namespace = conf.get("kubernetes", "NAMESPACE")

# This will detect the default namespace locally and read the
# environment namespace when deployed to Astronomer.
if namespace == "default":
    config_file = "/usr/local/airflow/include/.kube/config"
    in_cluster = False
else:
    in_cluster = True
    config_file = None

default_args = {
    "owner": "airflow",
}

with DAG(
    dag_id="k8_pod_operator_xcom",
    default_args=default_args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=["k8"],
) as dag:

    write_xcom = KubernetesPodOperator(
        namespace=namespace,
        in_cluster=in_cluster,
        config_file=config_file,
        image="ubuntu",
        cmds=[
            "sh",
            "-c",
            "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json",
        ],
        name="write-xcom",
        do_xcom_push=True,
        is_delete_operator_pod=True,
        task_id="write-xcom",
        get_logs=True,
    )

    pod_task_xcom_result = BashOperator(
        bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
        task_id="pod_task_xcom_result",
    )

    write_xcom >> pod_task_xcom_result

```

closes #17186
kaxil added a commit to astronomer/airflow that referenced this issue Aug 23, 2021
apache#17186 -- This has made the XCom functionality not work with KubernetesPodOperator, this has been fixed by apache#17760 -- so we should get this out sooner rather than later as recently released Airflow 2.1.3 will pull in latest kubernetes provider when we run `pip install -U apache-airflow[cncf.kubernetes]`
potiuk pushed a commit that referenced this issue Aug 24, 2021
#17186 -- This has made the XCom functionality not work with KubernetesPodOperator, this has been fixed by #17760 -- so we should get this out sooner rather than later as recently released Airflow 2.1.3 will pull in latest kubernetes provider when we run `pip install -U apache-airflow[cncf.kubernetes]`
@raphaelauv
Copy link
Contributor

raphaelauv commented Sep 29, 2021

For people confused by the release 2.0.2

the commit Fix using XCom with ''KubernetesPodOperator'' (#17760) mostly fix the error of the templating of the K8S secrets

so don't use 2.0.1 !

leahecole pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Mar 10, 2022
This commit effectively revers apache/airflow#15942 because of the issues it is causing. Until we find better solution we should revert this change

Error:

```
[2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00
[2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task
[2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l']
[2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom
[2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710
[2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks
    self.render_templates(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates
    self.task.render_template_fields(context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields
    self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields
    rendered_content = self.render_template(content, context, jinja_env, seen_oids)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template
    return [self.render_template(element, context, jinja_env) for element in content]
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp>
    return [self.render_template(element, context, jinja_env) for element in content]
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template
    return jinja_env.get_template(content).render(**context)
  File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template
    return self._load_template(name, self.make_globals(globals))
  File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template
    template = self.loader.load(self, name, globals)
  File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load
    source, filename, uptodate = self.get_source(environment, name)
  File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source
    raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json
[2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354
[2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1
```

Dag:

```
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from airflow.utils.dates import days_ago
from airflow.configuration import conf

namespace = conf.get("kubernetes", "NAMESPACE")

# This will detect the default namespace locally and read the
# environment namespace when deployed to Astronomer.
if namespace == "default":
    config_file = "/usr/local/airflow/include/.kube/config"
    in_cluster = False
else:
    in_cluster = True
    config_file = None

default_args = {
    "owner": "airflow",
}

with DAG(
    dag_id="k8_pod_operator_xcom",
    default_args=default_args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=["k8"],
) as dag:

    write_xcom = KubernetesPodOperator(
        namespace=namespace,
        in_cluster=in_cluster,
        config_file=config_file,
        image="ubuntu",
        cmds=[
            "sh",
            "-c",
            "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json",
        ],
        name="write-xcom",
        do_xcom_push=True,
        is_delete_operator_pod=True,
        task_id="write-xcom",
        get_logs=True,
    )

    pod_task_xcom_result = BashOperator(
        bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
        task_id="pod_task_xcom_result",
    )

    write_xcom >> pod_task_xcom_result

```

closes apache/airflow#17186

GitOrigin-RevId: 73d2b720e0c79323a29741882a07eb8962256762
leahecole pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Mar 10, 2022
apache/airflow#17186 -- This has made the XCom functionality not work with KubernetesPodOperator, this has been fixed by apache/airflow#17760 -- so we should get this out sooner rather than later as recently released Airflow 2.1.3 will pull in latest kubernetes provider when we run `pip install -U apache-airflow[cncf.kubernetes]`

GitOrigin-RevId: bb5602c652988d0b31ea5e0db8f03725a2f22d34
leahecole pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Jun 4, 2022
This commit effectively revers apache/airflow#15942 because of the issues it is causing. Until we find better solution we should revert this change

Error:

```
[2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00
[2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task
[2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l']
[2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom
[2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710
[2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks
    self.render_templates(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates
    self.task.render_template_fields(context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields
    self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields
    rendered_content = self.render_template(content, context, jinja_env, seen_oids)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template
    return [self.render_template(element, context, jinja_env) for element in content]
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp>
    return [self.render_template(element, context, jinja_env) for element in content]
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template
    return jinja_env.get_template(content).render(**context)
  File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template
    return self._load_template(name, self.make_globals(globals))
  File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template
    template = self.loader.load(self, name, globals)
  File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load
    source, filename, uptodate = self.get_source(environment, name)
  File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source
    raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json
[2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354
[2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1
```

Dag:

```
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from airflow.utils.dates import days_ago
from airflow.configuration import conf

namespace = conf.get("kubernetes", "NAMESPACE")

# This will detect the default namespace locally and read the
# environment namespace when deployed to Astronomer.
if namespace == "default":
    config_file = "/usr/local/airflow/include/.kube/config"
    in_cluster = False
else:
    in_cluster = True
    config_file = None

default_args = {
    "owner": "airflow",
}

with DAG(
    dag_id="k8_pod_operator_xcom",
    default_args=default_args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=["k8"],
) as dag:

    write_xcom = KubernetesPodOperator(
        namespace=namespace,
        in_cluster=in_cluster,
        config_file=config_file,
        image="ubuntu",
        cmds=[
            "sh",
            "-c",
            "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json",
        ],
        name="write-xcom",
        do_xcom_push=True,
        is_delete_operator_pod=True,
        task_id="write-xcom",
        get_logs=True,
    )

    pod_task_xcom_result = BashOperator(
        bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
        task_id="pod_task_xcom_result",
    )

    write_xcom >> pod_task_xcom_result

```

closes apache/airflow#17186

GitOrigin-RevId: 73d2b720e0c79323a29741882a07eb8962256762
leahecole pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Jun 4, 2022
apache/airflow#17186 -- This has made the XCom functionality not work with KubernetesPodOperator, this has been fixed by apache/airflow#17760 -- so we should get this out sooner rather than later as recently released Airflow 2.1.3 will pull in latest kubernetes provider when we run `pip install -U apache-airflow[cncf.kubernetes]`

GitOrigin-RevId: bb5602c652988d0b31ea5e0db8f03725a2f22d34
kosteev pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Jul 10, 2022
This commit effectively revers apache/airflow#15942 because of the issues it is causing. Until we find better solution we should revert this change

Error:

```
[2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00
[2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task
[2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l']
[2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom
[2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710
[2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks
    self.render_templates(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates
    self.task.render_template_fields(context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields
    self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields
    rendered_content = self.render_template(content, context, jinja_env, seen_oids)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template
    return [self.render_template(element, context, jinja_env) for element in content]
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp>
    return [self.render_template(element, context, jinja_env) for element in content]
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template
    return jinja_env.get_template(content).render(**context)
  File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template
    return self._load_template(name, self.make_globals(globals))
  File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template
    template = self.loader.load(self, name, globals)
  File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load
    source, filename, uptodate = self.get_source(environment, name)
  File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source
    raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json
[2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354
[2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1
```

Dag:

```
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from airflow.utils.dates import days_ago
from airflow.configuration import conf

namespace = conf.get("kubernetes", "NAMESPACE")

# This will detect the default namespace locally and read the
# environment namespace when deployed to Astronomer.
if namespace == "default":
    config_file = "/usr/local/airflow/include/.kube/config"
    in_cluster = False
else:
    in_cluster = True
    config_file = None

default_args = {
    "owner": "airflow",
}

with DAG(
    dag_id="k8_pod_operator_xcom",
    default_args=default_args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=["k8"],
) as dag:

    write_xcom = KubernetesPodOperator(
        namespace=namespace,
        in_cluster=in_cluster,
        config_file=config_file,
        image="ubuntu",
        cmds=[
            "sh",
            "-c",
            "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json",
        ],
        name="write-xcom",
        do_xcom_push=True,
        is_delete_operator_pod=True,
        task_id="write-xcom",
        get_logs=True,
    )

    pod_task_xcom_result = BashOperator(
        bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
        task_id="pod_task_xcom_result",
    )

    write_xcom >> pod_task_xcom_result

```

closes apache/airflow#17186

GitOrigin-RevId: 73d2b720e0c79323a29741882a07eb8962256762
kosteev pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Jul 10, 2022
apache/airflow#17186 -- This has made the XCom functionality not work with KubernetesPodOperator, this has been fixed by apache/airflow#17760 -- so we should get this out sooner rather than later as recently released Airflow 2.1.3 will pull in latest kubernetes provider when we run `pip install -U apache-airflow[cncf.kubernetes]`

GitOrigin-RevId: bb5602c652988d0b31ea5e0db8f03725a2f22d34
leahecole pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Aug 27, 2022
This commit effectively revers apache/airflow#15942 because of the issues it is causing. Until we find better solution we should revert this change

Error:

```
[2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00
[2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task
[2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l']
[2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom
[2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710
[2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks
    self.render_templates(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates
    self.task.render_template_fields(context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields
    self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields
    rendered_content = self.render_template(content, context, jinja_env, seen_oids)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template
    return [self.render_template(element, context, jinja_env) for element in content]
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp>
    return [self.render_template(element, context, jinja_env) for element in content]
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template
    return jinja_env.get_template(content).render(**context)
  File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template
    return self._load_template(name, self.make_globals(globals))
  File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template
    template = self.loader.load(self, name, globals)
  File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load
    source, filename, uptodate = self.get_source(environment, name)
  File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source
    raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json
[2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354
[2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1
```

Dag:

```
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from airflow.utils.dates import days_ago
from airflow.configuration import conf

namespace = conf.get("kubernetes", "NAMESPACE")

# This will detect the default namespace locally and read the
# environment namespace when deployed to Astronomer.
if namespace == "default":
    config_file = "/usr/local/airflow/include/.kube/config"
    in_cluster = False
else:
    in_cluster = True
    config_file = None

default_args = {
    "owner": "airflow",
}

with DAG(
    dag_id="k8_pod_operator_xcom",
    default_args=default_args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=["k8"],
) as dag:

    write_xcom = KubernetesPodOperator(
        namespace=namespace,
        in_cluster=in_cluster,
        config_file=config_file,
        image="ubuntu",
        cmds=[
            "sh",
            "-c",
            "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json",
        ],
        name="write-xcom",
        do_xcom_push=True,
        is_delete_operator_pod=True,
        task_id="write-xcom",
        get_logs=True,
    )

    pod_task_xcom_result = BashOperator(
        bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
        task_id="pod_task_xcom_result",
    )

    write_xcom >> pod_task_xcom_result

```

closes apache/airflow#17186

GitOrigin-RevId: 73d2b720e0c79323a29741882a07eb8962256762
leahecole pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Aug 27, 2022
apache/airflow#17186 -- This has made the XCom functionality not work with KubernetesPodOperator, this has been fixed by apache/airflow#17760 -- so we should get this out sooner rather than later as recently released Airflow 2.1.3 will pull in latest kubernetes provider when we run `pip install -U apache-airflow[cncf.kubernetes]`

GitOrigin-RevId: bb5602c652988d0b31ea5e0db8f03725a2f22d34
leahecole pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Oct 4, 2022
This commit effectively revers apache/airflow#15942 because of the issues it is causing. Until we find better solution we should revert this change

Error:

```
[2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00
[2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task
[2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l']
[2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom
[2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710
[2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks
    self.render_templates(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates
    self.task.render_template_fields(context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields
    self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields
    rendered_content = self.render_template(content, context, jinja_env, seen_oids)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template
    return [self.render_template(element, context, jinja_env) for element in content]
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp>
    return [self.render_template(element, context, jinja_env) for element in content]
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template
    return jinja_env.get_template(content).render(**context)
  File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template
    return self._load_template(name, self.make_globals(globals))
  File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template
    template = self.loader.load(self, name, globals)
  File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load
    source, filename, uptodate = self.get_source(environment, name)
  File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source
    raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json
[2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354
[2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1
```

Dag:

```
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from airflow.utils.dates import days_ago
from airflow.configuration import conf

namespace = conf.get("kubernetes", "NAMESPACE")

# This will detect the default namespace locally and read the
# environment namespace when deployed to Astronomer.
if namespace == "default":
    config_file = "/usr/local/airflow/include/.kube/config"
    in_cluster = False
else:
    in_cluster = True
    config_file = None

default_args = {
    "owner": "airflow",
}

with DAG(
    dag_id="k8_pod_operator_xcom",
    default_args=default_args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=["k8"],
) as dag:

    write_xcom = KubernetesPodOperator(
        namespace=namespace,
        in_cluster=in_cluster,
        config_file=config_file,
        image="ubuntu",
        cmds=[
            "sh",
            "-c",
            "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json",
        ],
        name="write-xcom",
        do_xcom_push=True,
        is_delete_operator_pod=True,
        task_id="write-xcom",
        get_logs=True,
    )

    pod_task_xcom_result = BashOperator(
        bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
        task_id="pod_task_xcom_result",
    )

    write_xcom >> pod_task_xcom_result

```

closes apache/airflow#17186

GitOrigin-RevId: 73d2b720e0c79323a29741882a07eb8962256762
leahecole pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Oct 4, 2022
apache/airflow#17186 -- This has made the XCom functionality not work with KubernetesPodOperator, this has been fixed by apache/airflow#17760 -- so we should get this out sooner rather than later as recently released Airflow 2.1.3 will pull in latest kubernetes provider when we run `pip install -U apache-airflow[cncf.kubernetes]`

GitOrigin-RevId: bb5602c652988d0b31ea5e0db8f03725a2f22d34
aglipska pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Oct 7, 2022
This commit effectively revers apache/airflow#15942 because of the issues it is causing. Until we find better solution we should revert this change

Error:

```
[2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00
[2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task
[2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l']
[2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom
[2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710
[2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks
    self.render_templates(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates
    self.task.render_template_fields(context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields
    self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields
    rendered_content = self.render_template(content, context, jinja_env, seen_oids)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template
    return [self.render_template(element, context, jinja_env) for element in content]
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp>
    return [self.render_template(element, context, jinja_env) for element in content]
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template
    return jinja_env.get_template(content).render(**context)
  File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template
    return self._load_template(name, self.make_globals(globals))
  File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template
    template = self.loader.load(self, name, globals)
  File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load
    source, filename, uptodate = self.get_source(environment, name)
  File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source
    raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json
[2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354
[2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1
```

Dag:

```
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from airflow.utils.dates import days_ago
from airflow.configuration import conf

namespace = conf.get("kubernetes", "NAMESPACE")

# This will detect the default namespace locally and read the
# environment namespace when deployed to Astronomer.
if namespace == "default":
    config_file = "/usr/local/airflow/include/.kube/config"
    in_cluster = False
else:
    in_cluster = True
    config_file = None

default_args = {
    "owner": "airflow",
}

with DAG(
    dag_id="k8_pod_operator_xcom",
    default_args=default_args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=["k8"],
) as dag:

    write_xcom = KubernetesPodOperator(
        namespace=namespace,
        in_cluster=in_cluster,
        config_file=config_file,
        image="ubuntu",
        cmds=[
            "sh",
            "-c",
            "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json",
        ],
        name="write-xcom",
        do_xcom_push=True,
        is_delete_operator_pod=True,
        task_id="write-xcom",
        get_logs=True,
    )

    pod_task_xcom_result = BashOperator(
        bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
        task_id="pod_task_xcom_result",
    )

    write_xcom >> pod_task_xcom_result

```

closes apache/airflow#17186

GitOrigin-RevId: 73d2b720e0c79323a29741882a07eb8962256762
aglipska pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Oct 7, 2022
apache/airflow#17186 -- This has made the XCom functionality not work with KubernetesPodOperator, this has been fixed by apache/airflow#17760 -- so we should get this out sooner rather than later as recently released Airflow 2.1.3 will pull in latest kubernetes provider when we run `pip install -U apache-airflow[cncf.kubernetes]`

GitOrigin-RevId: bb5602c652988d0b31ea5e0db8f03725a2f22d34
leahecole pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Dec 7, 2022
This commit effectively revers apache/airflow#15942 because of the issues it is causing. Until we find better solution we should revert this change

Error:

```
[2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00
[2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task
[2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l']
[2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom
[2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710
[2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks
    self.render_templates(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates
    self.task.render_template_fields(context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields
    self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields
    rendered_content = self.render_template(content, context, jinja_env, seen_oids)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template
    return [self.render_template(element, context, jinja_env) for element in content]
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp>
    return [self.render_template(element, context, jinja_env) for element in content]
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template
    return jinja_env.get_template(content).render(**context)
  File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template
    return self._load_template(name, self.make_globals(globals))
  File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template
    template = self.loader.load(self, name, globals)
  File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load
    source, filename, uptodate = self.get_source(environment, name)
  File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source
    raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json
[2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354
[2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1
```

Dag:

```
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from airflow.utils.dates import days_ago
from airflow.configuration import conf

namespace = conf.get("kubernetes", "NAMESPACE")

# This will detect the default namespace locally and read the
# environment namespace when deployed to Astronomer.
if namespace == "default":
    config_file = "/usr/local/airflow/include/.kube/config"
    in_cluster = False
else:
    in_cluster = True
    config_file = None

default_args = {
    "owner": "airflow",
}

with DAG(
    dag_id="k8_pod_operator_xcom",
    default_args=default_args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=["k8"],
) as dag:

    write_xcom = KubernetesPodOperator(
        namespace=namespace,
        in_cluster=in_cluster,
        config_file=config_file,
        image="ubuntu",
        cmds=[
            "sh",
            "-c",
            "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json",
        ],
        name="write-xcom",
        do_xcom_push=True,
        is_delete_operator_pod=True,
        task_id="write-xcom",
        get_logs=True,
    )

    pod_task_xcom_result = BashOperator(
        bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
        task_id="pod_task_xcom_result",
    )

    write_xcom >> pod_task_xcom_result

```

closes apache/airflow#17186

GitOrigin-RevId: 73d2b720e0c79323a29741882a07eb8962256762
leahecole pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Dec 7, 2022
apache/airflow#17186 -- This has made the XCom functionality not work with KubernetesPodOperator, this has been fixed by apache/airflow#17760 -- so we should get this out sooner rather than later as recently released Airflow 2.1.3 will pull in latest kubernetes provider when we run `pip install -U apache-airflow[cncf.kubernetes]`

GitOrigin-RevId: bb5602c652988d0b31ea5e0db8f03725a2f22d34
leahecole pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Jan 27, 2023
This commit effectively revers apache/airflow#15942 because of the issues it is causing. Until we find better solution we should revert this change

Error:

```
[2021-07-26 20:23:54,109] {taskinstance.py:1108} INFO - Executing <Task(KubernetesPodOperator): write-xcom> on 2021-07-26T20:23:27.058907+00:00
[2021-07-26 20:23:54,113] {standard_task_runner.py:52} INFO - Started process 11 to run task
[2021-07-26 20:23:54,297] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'k8_pod_operator_xcom', 'write-xcom', '2021-07-26T20:23:27.058907+00:00', '--job-id', '1757', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/k8s_xcom_example.py', '--cfg-path', '/tmp/tmp0q94pkhs', '--error-file', '/tmp/tmpoz9qqp2l']
[2021-07-26 20:23:54,298] {standard_task_runner.py:77} INFO - Job 1757: Subtask write-xcom
[2021-07-26 20:23:54,511] {logging_mixin.py:104} INFO - Running <TaskInstance: k8_pod_operator_xcom.write-xcom 2021-07-26T20:23:27.058907+00:00 [running]> on host k8podoperatorxcomwritexcom.21384021df914227ad4e4b3a34313710
[2021-07-26 20:23:54,713] {taskinstance.py:1502} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1158, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1295, in _prepare_and_execute_task_with_callbacks
    self.render_templates(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1796, in render_templates
    self.task.render_template_fields(context)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 999, in render_template_fields
    self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1012, in _do_render_template_fields
    rendered_content = self.render_template(content, context, jinja_env, seen_oids)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in render_template
    return [self.render_template(element, context, jinja_env) for element in content]
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1063, in <listcomp>
    return [self.render_template(element, context, jinja_env) for element in content]
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 1047, in render_template
    return jinja_env.get_template(content).render(**context)
  File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 883, in get_template
    return self._load_template(name, self.make_globals(globals))
  File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 857, in _load_template
    template = self.loader.load(self, name, globals)
  File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 115, in load
    source, filename, uptodate = self.get_source(environment, name)
  File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 197, in get_source
    raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json
[2021-07-26 20:23:54,797] {taskinstance.py:1552} INFO - Marking task as FAILED. dag_id=k8_pod_operator_xcom, task_id=write-xcom, execution_date=20210726T202327, start_date=20210726T202353, end_date=20210726T202354
[2021-07-26 20:23:54,936] {local_task_job.py:153} INFO - Task exited with return code 1
```

Dag:

```
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from airflow.utils.dates import days_ago
from airflow.configuration import conf

namespace = conf.get("kubernetes", "NAMESPACE")

# This will detect the default namespace locally and read the
# environment namespace when deployed to Astronomer.
if namespace == "default":
    config_file = "/usr/local/airflow/include/.kube/config"
    in_cluster = False
else:
    in_cluster = True
    config_file = None

default_args = {
    "owner": "airflow",
}

with DAG(
    dag_id="k8_pod_operator_xcom",
    default_args=default_args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=["k8"],
) as dag:

    write_xcom = KubernetesPodOperator(
        namespace=namespace,
        in_cluster=in_cluster,
        config_file=config_file,
        image="ubuntu",
        cmds=[
            "sh",
            "-c",
            "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json",
        ],
        name="write-xcom",
        do_xcom_push=True,
        is_delete_operator_pod=True,
        task_id="write-xcom",
        get_logs=True,
    )

    pod_task_xcom_result = BashOperator(
        bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
        task_id="pod_task_xcom_result",
    )

    write_xcom >> pod_task_xcom_result

```

closes apache/airflow#17186

GitOrigin-RevId: 73d2b720e0c79323a29741882a07eb8962256762
leahecole pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Jan 27, 2023
apache/airflow#17186 -- This has made the XCom functionality not work with KubernetesPodOperator, this has been fixed by apache/airflow#17760 -- so we should get this out sooner rather than later as recently released Airflow 2.1.3 will pull in latest kubernetes provider when we run `pip install -U apache-airflow[cncf.kubernetes]`

GitOrigin-RevId: bb5602c652988d0b31ea5e0db8f03725a2f22d34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants