Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIP-31] Implement XComArg model to functionally pass output from one operator to the next #8652

Merged
merged 13 commits into from
May 9, 2020
Merged
30 changes: 26 additions & 4 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -804,12 +804,15 @@ def render_template( # pylint: disable=too-many-return-statements
if not jinja_env:
jinja_env = self.get_template_env()

from airflow.models.xcom_arg import XComArg
jonathanshir marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(content, str):
if any(content.endswith(ext) for ext in self.template_ext):
# Content contains a filepath
return jinja_env.get_template(content).render(**context)
else:
return jinja_env.from_string(content).render(**context)
elif isinstance(content, XComArg):
return content.resolve(context)

if isinstance(content, tuple):
if type(content) is not tuple: # pylint: disable=unidiomatic-typecheck
Expand Down Expand Up @@ -1064,10 +1067,23 @@ def _set_relatives(self,
task_or_task_list: Union['BaseOperator', List['BaseOperator']],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to adjust this type annotation. But it will be hard due to circular imports.

upstream: bool = False) -> None:
"""Sets relatives for the task or task list."""
try:
task_list = list(task_or_task_list) # type: ignore
except TypeError:
task_list = [task_or_task_list] # type: ignore
from airflow.models.xcom_arg import XComArg

if isinstance(task_or_task_list, XComArg):
# otherwise we will start to iterate over xcomarg
# because of the "list" check below
# with current XComArg.__getitem__ implementation
task_list = [task_or_task_list.operator]
else:
try:
task_list = list(task_or_task_list) # type: ignore
except TypeError:
task_list = [task_or_task_list] # type: ignore

task_list = [
t.operator if isinstance(t, XComArg) else t # type: ignore
for t in task_list
]

for task in task_list:
if not isinstance(task, BaseOperator):
Expand Down Expand Up @@ -1121,6 +1137,12 @@ def set_upstream(self, task_or_task_list: Union['BaseOperator', List['BaseOperat
"""
self._set_relatives(task_or_task_list, upstream=True)

@property
def output(self):
"""Returns default XComArg for the operator"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a bit more detail, something like Output from the previous task using XComArg or something similar so that is simpler to understand. WDYT ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"XCom representation of output received from this operator"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Returns reference to XCom pushed by current operator"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either of them is fine :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We haven't updated this yet :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merging this PR ignoring this change (we can update it in a related follow-up PR)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in #8805

from airflow.models.xcom_arg import XComArg
return XComArg(operator=self)

@staticmethod
def xcom_push(
context: Any,
Expand Down
145 changes: 145 additions & 0 deletions airflow/models/xcom_arg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import Any, Dict, List, Union

from airflow.exceptions import AirflowException
from airflow.models.baseoperator import BaseOperator
from airflow.models.xcom import XCOM_RETURN_KEY


class XComArg:
"""
Class that represents a XCom push from a previous operator.
Defaults to "return_value" as only key.

Current implementations supports
jonathanshir marked this conversation as resolved.
Show resolved Hide resolved
xcomarg >> op
xcomarg << op
op >> xcomarg (by BaseOperator code)
op << xcomarg (by BaseOperator code)

**Example**: The moment you get a result from any operator (functional or regular) you can ::

xcomarg = ...
my_op = MyOperator()
my_op >> xcomarg

jonathanshir marked this conversation as resolved.
Show resolved Hide resolved
This object can be used in legacy Operators via Jinja.

**Example**: You can make this result to be part of any generated string ::

xcomarg = ...
op1 = MyOperator(my_text_message=f"the value is {xcomarg}")
op2 = MyOperator(my_text_message=f"the value is {xcomarg['topic']}")

:param operator: operator to which the XComArg belongs to
:type operator: airflow.models.baseoperator.BaseOperator
:param key: key value which is used for xcom_pull (key in the XCom table)
:type key: str
"""

def __init__(self, operator: BaseOperator, key: str = XCOM_RETURN_KEY):
self._operator = operator
self._key = key

def __eq__(self, other):
return (self.operator == other.operator
and self.key == other.key)

def __lshift__(self, other):
"""
Implements xcomresult << op
jonathanshir marked this conversation as resolved.
Show resolved Hide resolved
"""
self.set_upstream(other)
return self

def __rshift__(self, other):
"""
Implements xcomresult >> op
jonathanshir marked this conversation as resolved.
Show resolved Hide resolved
"""
self.set_downstream(other)
return self

def __getitem__(self, item):
"""
Implements xcomresult['some_result_key']
"""
return XComArg(operator=self.operator, key=item)

def __str__(self):
"""
Backward compatibility for old-style jinja used in Airflow Operators

**Example**: to use XArg at BashOperator::
jonathanshir marked this conversation as resolved.
Show resolved Hide resolved

BashOperator(cmd=f"... { xcomarg } ...")

:return:
"""
xcom_pull_kwargs = [f"task_ids='{self.operator.task_id}'",
f"dag_id='{self.operator.dag.dag_id}'",
]
if self.key is not None:
xcom_pull_kwargs.append(f"key='{self.key}'")
Comment on lines +101 to +102
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that self.key will be None? The default value is XCOM_RETURN_KEY and as far as I understand the key has to be provided. If it's so, we should have a check for that in __init__.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Key is none is only possible if the base class is overridden (MyXComArg inherits XComArg but doesn't provide a key at all, for example).
I think this is good functionality in case your xcom system doesn't have a key necessarily

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, didn't we make XCom overwritable instead? I guess you can overwrite it and set it on the output property, but not sure if I understand the use for that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure exactly what you meant cas, can you elaborate on "overwritable instead"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is XComArg overridden? XComArg is not easy to be overwritten currently. Can you show an example use case where XComArg is overridden by a subclass?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

XCom system itself supports key=None, so our goal with this was to allow that kind of usage, so if XCom backend uses key=None we already have support for it in XComArg.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm. Maybe we should still print that key value is None. But not blocking for me.


xcom_pull_kwargs = ", ".join(xcom_pull_kwargs)
xcom_pull = f"task_instance.xcom_pull({xcom_pull_kwargs})"
return xcom_pull

@property
def operator(self) -> BaseOperator:
"""Returns operator of this XComArg"""
return self._operator

@property
def key(self) -> str:
"""Returns keys of this XComArg"""
return self._key

def set_upstream(self, task_or_task_list: Union[BaseOperator, List[BaseOperator]]):
"""
Proxy to underlying operator set_upstream method
"""
self.operator.set_upstream(task_or_task_list)

def set_downstream(
self, task_or_task_list: Union[BaseOperator, List[BaseOperator]]
):
"""
Proxy to underlying operator set_downstream method
"""
self.operator.set_downstream(task_or_task_list)

def resolve(self, context: Dict) -> Any:
"""
Pull XCom value for the existing arg. This method is run during ``op.execute()``
in respectable context.
"""
resolved_value = self.operator.xcom_pull(
context=context,
task_ids=[self.operator.task_id],
key=str(self.key), # xcom_pull supports only key as str
dag_id=self.operator.dag.dag_id,
)
if not resolved_value:
raise AirflowException(
f'XComArg result from {self.operator.task_id} at {self.operator.dag.dag_id} '
f'with key="{self.key}"" is not found!')
resolved_value = resolved_value[0]

return resolved_value
1 change: 1 addition & 0 deletions scripts/ci/pylint_todo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
./airflow/models/taskinstance.py
./airflow/models/variable.py
./airflow/models/xcom.py
./airflow/models/xcom_arg.py
turbaszek marked this conversation as resolved.
Show resolved Hide resolved
./airflow/settings.py
./airflow/stats.py
./airflow/www/api/experimental/endpoints.py
Expand Down
146 changes: 146 additions & 0 deletions tests/models/test_xcom_arg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime, timedelta

from airflow import DAG
from airflow.models.xcom_arg import XComArg
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

DEFAULT_ARGS = {
"owner": "test",
"depends_on_past": True,
"start_date": datetime.today(),
"retries": 1,
"retry_delay": timedelta(minutes=1),
}

VALUE = 42


def assert_is_value(num: int):
assert num == VALUE


def build_python_op():
def f(task_id):
return f"OP:{task_id}"

with DAG(dag_id="test_xcom_dag", default_args=DEFAULT_ARGS):
operator = PythonOperator(
python_callable=f,
task_id="test_xcom_op",
do_xcom_push=True,
)
return operator


class TestXComArgBuild:
def test_xcom_ctor(self):
python_op = build_python_op()
actual = XComArg(python_op, "test_key")
assert actual
assert actual.operator == python_op
assert actual.key == "test_key"
assert actual == actual # pylint: disable=comparison-with-itself
jonathanshir marked this conversation as resolved.
Show resolved Hide resolved
jonathanshir marked this conversation as resolved.
Show resolved Hide resolved
assert str(actual)
jonathanshir marked this conversation as resolved.
Show resolved Hide resolved

def test_xcom_key_is_empty_str(self):
python_op = build_python_op()
actual = XComArg(python_op, key="")
assert actual.key == ""
assert str(actual) == "task_instance.xcom_pull(task_ids='test_xcom_op', " \
"dag_id='test_xcom_dag', key='')"

def test_set_downstream(self):
with DAG("test_set_downstream", default_args=DEFAULT_ARGS):
op_a = BashOperator(task_id="a", bash_command="echo a")
op_b = BashOperator(task_id="b", bash_command="echo b")
bash_op = BashOperator(task_id="c", bash_command="echo c")
xcom_args_a = XComArg(op_a)
xcom_args_b = XComArg(op_b)

xcom_args_a >> xcom_args_b >> bash_op

assert len(op_a.downstream_list) == 2
assert op_b in op_a.downstream_list
assert bash_op in op_a.downstream_list

def test_set_upstream(self):
with DAG("test_set_upstream", default_args=DEFAULT_ARGS):
op_a = BashOperator(task_id="a", bash_command="echo a")
op_b = BashOperator(task_id="b", bash_command="echo b")
bash_op = BashOperator(task_id="c", bash_command="echo c")
xcom_args_a = XComArg(op_a)
xcom_args_b = XComArg(op_b)

xcom_args_a << xcom_args_b << bash_op

assert len(op_a.upstream_list) == 2
assert op_b in op_a.upstream_list
assert bash_op in op_a.upstream_list

def test_xcom_arg_property_of_base_operator(self):
with DAG("test_xcom_arg_property_of_base_operator", default_args=DEFAULT_ARGS):
op_a = BashOperator(task_id="a", bash_command="echo a")

assert op_a.output == XComArg(op_a)

def test_xcom_key_getitem(self):
python_op = build_python_op()
actual = XComArg(python_op, key="another_key")
assert actual.key == "another_key"
actual_new_key = actual["another_key_2"]
assert actual_new_key.key == "another_key_2"


class TestXComArgRuntime:
def test_xcom_pass_to_op(self):
with DAG(dag_id="test_xcom_pass_to_op", default_args=DEFAULT_ARGS) as dag:
operator = PythonOperator(
python_callable=lambda: VALUE,
task_id="return_value_1",
do_xcom_push=True,
)
xarg = XComArg(operator)
operator2 = PythonOperator(
python_callable=assert_is_value,
op_args=[xarg],
task_id="assert_is_value_1",
)
operator >> operator2
dag.run(local=True)

def test_xcom_push_and_pass(self):
def push_xcom_value(key, value, **context):
ti = context["task_instance"]
ti.xcom_push(key, value)

with DAG(dag_id="test_xcom_push_and_pass", default_args=DEFAULT_ARGS) as dag:
op1 = PythonOperator(
python_callable=push_xcom_value,
task_id="push_xcom_value",
op_args=["my_key", VALUE],
)
xarg = XComArg(op1, key="my_key")
op2 = PythonOperator(
python_callable=assert_is_value,
task_id="assert_is_value_1",
op_args=[xarg],
)
op1 >> op2
dag.run(local=True)