Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIP-31] Implement XComArg model to functionally pass output from one operator to the next #8652

Merged
merged 13 commits into from
May 9, 2020

Conversation

jonathanshir
Copy link
Contributor

@jonathanshir jonathanshir commented Apr 30, 2020

This PR is a part of [AIP-31], and closes the following issues:
closes #8055, closes #8053, closes #8052 .
The goal of the XComArg model is to create a communication token to be passed between operators. In this fashion, we can wire the output of one operator to the next in a functional and "comfortable" (code-writing-wise) manner.
Collaborated with: @turbaszek @casassg @evgenyshulman


Make sure to mark the boxes below before creating PR: [x]

  • Description above provides context of the change
  • Unit tests coverage for changes (not needed for documentation changes)
  • Target Github ISSUE in description if exists
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

jonathanshir and others added 8 commits April 30, 2020 20:35
Added rendering of XComArg in base operator
Added xcom arg test, todo multiple keys
Add output property to BaseOperator

closes: apache#8055
adding support for runtime sub-item access
adding support for usage templated_args (legacy)
Made XCom unit tests run faster
@jonathanshir jonathanshir changed the title [AIP-31] Implement XComArg model to be passed between operators [AIP-31] Implement XComArg model to functionally pass output from one operator to the next Apr 30, 2020
airflow/models/xcom_arg.py Outdated Show resolved Hide resolved
scripts/ci/pylint_todo.txt Outdated Show resolved Hide resolved
tests/models/test_xcom_arg.py Outdated Show resolved Hide resolved
tests/models/test_xcom_arg.py Outdated Show resolved Hide resolved
@turbaszek turbaszek added the AIP-31 Task Flow API for nicer DAG definition label May 4, 2020
@@ -1064,10 +1069,23 @@ def _set_relatives(self,
task_or_task_list: Union['BaseOperator', List['BaseOperator']],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to adjust this type annotation. But it will be hard due to circular imports.

Comment on lines +99 to +100
if self.key is not None:
xcom_pull_kwargs.append(f"key='{self.key}'")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that self.key will be None? The default value is XCOM_RETURN_KEY and as far as I understand the key has to be provided. If it's so, we should have a check for that in __init__.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Key is none is only possible if the base class is overridden (MyXComArg inherits XComArg but doesn't provide a key at all, for example).
I think this is good functionality in case your xcom system doesn't have a key necessarily

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, didn't we make XCom overwritable instead? I guess you can overwrite it and set it on the output property, but not sure if I understand the use for that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure exactly what you meant cas, can you elaborate on "overwritable instead"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is XComArg overridden? XComArg is not easy to be overwritten currently. Can you show an example use case where XComArg is overridden by a subclass?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

XCom system itself supports key=None, so our goal with this was to allow that kind of usage, so if XCom backend uses key=None we already have support for it in XComArg.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm. Maybe we should still print that key value is None. But not blocking for me.

Co-authored-by: Tomek Urbaszek <turbaszek@gmail.com>
@turbaszek
Copy link
Member

It looks good to me. @kaxil @casassg @dimberman WDYT?

@jonathanshir jonathanshir marked this pull request as ready for review May 5, 2020 16:17
Copy link
Contributor

@dimberman dimberman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Great job @jonathanshir!

@@ -1121,6 +1139,12 @@ def set_upstream(self, task_or_task_list: Union['BaseOperator', List['BaseOperat
"""
self._set_relatives(task_or_task_list, upstream=True)

@property
def output(self):
"""Returns default XComArg for the operator"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a bit more detail, something like Output from the previous task using XComArg or something similar so that is simpler to understand. WDYT ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"XCom representation of output received from this operator"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Returns reference to XCom pushed by current operator"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either of them is fine :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We haven't updated this yet :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merging this PR ignoring this change (we can update it in a related follow-up PR)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in #8805

airflow/models/xcom_arg.py Outdated Show resolved Hide resolved
tests/models/test_xcom_arg.py Outdated Show resolved Hide resolved
Copy link
Contributor

@casassg casassg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall lgtm, added a comment on the self.key being null

Comments, as well as single assert statement in XComArg tests.

Co-authored-by: Tomek Urbaszek <turbaszek@gmail.com>
Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost there, just one comment from me and I think couple of question from @casassg and @turbaszek

@turbaszek
Copy link
Member

turbaszek commented May 7, 2020

@jonathanshir it seems that some of the tests are flaky or something:

File "/opt/airflow/tests/models/test_xcom_arg.py", line 130 in test_xcom_pass_to_op

In general, we are not running DAGs on CI. So I would suggest either mark the TestXComArgRuntime with system marker or remove/mock this test.

@turbaszek turbaszek force-pushed the feature/aip31_xcom_arg_model branch from e8c652e to 5c00ca8 Compare May 8, 2020 10:35
@turbaszek turbaszek force-pushed the feature/aip31_xcom_arg_model branch from 5c00ca8 to 955b45f Compare May 8, 2020 13:36
@turbaszek turbaszek requested a review from kaxil May 8, 2020 15:53
@kaxil kaxil merged commit bc19778 into apache:master May 9, 2020
@boring-cyborg
Copy link

boring-cyborg bot commented May 9, 2020

Awesome work, congrats on your first merged pull request!

@kaxil kaxil added this to the Airflow 1.10.11 milestone May 10, 2020
@kaxil
Copy link
Member

kaxil commented May 10, 2020

Tentatively added it to 1.10.11 milestone, will double-check when cherry-picking though to see if it for sure doesn't break backward-compatibility.

@ashb
Copy link
Member

ashb commented May 19, 2020

So this .output wasn't in the AIP proposal.

Why have we introduced it?

@turbaszek
Copy link
Member

turbaszek commented May 19, 2020

So this .output wasn't in the AIP proposal.

@ashb #8055 , main idea was to be able to mix functional and non-functional ops in one DAG.

@ashb
Copy link
Member

ashb commented May 19, 2020

So this .output wasn't in the AIP proposal.

@ashb #8055 , main idea was to be able to mix functional and non-functional ops in one DAG.

Will the example from the AIP still (eventually) work?

with DAG(
    'send_server_ip', default_args=default_args, schedule_interval=None
) as dag:

  # Using default connection as it's set to httpbin.org by default
  get_ip = SimpleHttpOperator(
      task_id='get_ip', endpoint='get', method='GET', xcom_push=True
  )

  @dag.task(add_context=True)
  def email_subject_generator(context: dict, raw_json: str) -> str:
    external_ip = json.loads(raw_json)['origin']
    return f"Server connected from {external_ip}"

  @task
  def email_body_generator(raw_json: str) -> str:
    return """
    Seems like today your server executing Airflow is connected from the external IP {origin}<br>
    <br>
    Have a nice day!
    """.format(**json.loads(raw_json))

  send_email = EmailOperator(
      task_id='send_email',
      to="example@example.com",
      subject='',
      html_content=''
  )

  ip_info = get_ip()
  subject = email_subject_generator(ip_info)
  body = email_body_generator(ip_info)
  send_email(subject=subject, html_content=body)

@turbaszek
Copy link
Member

Hm, I think not. I think it should be:

with DAG(
    'send_server_ip', default_args=default_args, schedule_interval=None
) as dag:

  # Using default connection as it's set to httpbin.org by default
  get_ip = SimpleHttpOperator(
      task_id='get_ip', endpoint='get', method='GET', xcom_push=True
  )

  @dag.task(add_context=True)
  def email_subject_generator(context: dict, raw_json: str) -> str:
    external_ip = json.loads(raw_json)['origin']
    return f"Server connected from {external_ip}"

  @task
  def email_body_generator(raw_json: str) -> str:
    return """
    Seems like today your server executing Airflow is connected from the external IP {origin}<br>
    <br>
    Have a nice day!
    """.format(**json.loads(raw_json))

  ip_info = get_ip()
  subject = email_subject_generator(ip_info)
  body = email_body_generator(ip_info)

  send_email = EmailOperator(
      task_id='send_email',
      to="example@example.com",
      subject=subject,
      html_content=body
  )

CC: @casassg

@ashb
Copy link
Member

ashb commented May 19, 2020

Okay, but the email_subject_generator(ip_info) (vs email_subject_generator(ip_info.output) as it has to be right now) will be fixed? Do we already have an issue for that?

@casassg
Copy link
Contributor

casassg commented May 19, 2020

Main idea is that calling regular operators may be a bit confusing vs setting XComArg on initialization. We can discuss this further w the @task PR coming soon that also brings some more end to end examples of what this would look like. Even though its a change from the initial AIP I think its for the best. Happy to write down the change and ammend/propose it in the original doc. It was mainly an alternative that appeared while we were implementing.

@turbaszek
Copy link
Member

Okay, but the email_subject_generator(ip_info) (vs email_subject_generator(ip_info.output) as it has to be right now)

It can be email_subject_generator(ip_info). The .output is only for operators defined in "old way" (not decorated with @task).

Calling a function decorated with @task returns XComArg. Calling an operator returns an instance of this operator. So the .output is shorthand for op.output == XComArg(op). But as @casassg says, we can adjust it once we have @task because then things will be simpler.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AIP-31 Task Flow API for nicer DAG definition area:dev-tools
Projects
None yet
7 participants