New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-3958] List tasks upstream work in chain #4779
[AIRFLOW-3958] List tasks upstream work in chain #4779
Conversation
|
I only found https://github.com/apache/airflow/blob/master/airflow/example_dags/example_short_circuit_operator.py#L47 using it of the entire project. Therefore I would suggest we remove the |
@feluelle I don't think we should remove I think function in t1 = DummyOperator(task_id='t1', dag=dag)
t2 = DummyOperator(task_id='t2', dag=dag)
...
tn = DummyOperator(task_id='tn', dag=dag)
t1 >> t2 >> ... >> tn or we just using function list_for_task = [DummyOperator(task_id='t{}'.format(i), dag=dag) for i in range(1, n + 1)]
chain(*list_for_task) I prefer the second way rather than the first one. One classic situation in my daily use is in data house, I want to transfer couple of Hive table and LOAD to different output system. I do like prepare_dw_table_for_ana_system = [
'prepare_step_1',
'prepare_step_2',
'prepare_step_3',
'prepare_step_4',
]
tasks_prepare_dw_table_for_ana_system = [
HiveOperator(
task='prepare_dw_table_for_ana_system_{}'.format(step),
sql='{}.sql'.format(step),
dag=dag
) for step in prepare_dw_table_for_ana_system
]
diff_system_adapter = [
'system_1',
'system_2',
'system_3',
'system_4',
]
tasks_diff_system_adapter = [
HiveOperator(
task='diff_system_adapter_{}'.format(system),
sql='{}.sql'.format(system),
dag=dag
) for system in diff_system_adapter
]
post_step = [DummyOperator(task_id='post_step_{}'.format(i), dag=dag) for i in range(3)]
chain(*tasks_prepare_dw_table_for_ana_system, tasks_diff_system_adapter, *post_step) I think make task in list could make the job more significative, because sometime we use muliti step to do one thing, and I want I refactor DAG just have to know what group tasks So, I don't think we should remove function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is useful. This method is not available in the API reference documentation. We need to think about how to add it to this in future. Now the group of users of this function is very narrow.
Reference:
https://airflow.readthedocs.io/en/latest/code.html
@mik-laj I have a plan to add what do you think? |
I think that now the documentation has one problem that should be solved first. The concepts.rst file is too large. We must think how to divide it and what new files to create. For example, the Creating new files in the howto directory is the easiest way, because it is not required for the file to be one with the rest of the documentation. If you do not have a lot of time, you can create a new guide. Otherwise, you should think about creating a new file or files in main directory, which will contain other elements from the concept file. |
Agree with @feluelle that |
@mik-laj In this situation, I think it better to create new file in |
@feng-tao But |
@zhongjiajie , I personally prefer one suggested approach instead of multiple different approaches. I am open to other committers' opinions. But IMO, I would prefer we stick with the |
@feng-tao I got your point. thanks |
I think that there is no one solution to the problem. The same as the existing several arithmetic operations, when you only need to add. There are several arithmetic operations when you only need to add. Subtraction is the negation of addition. Multiplication is a shortened of the addition operation. Each operation has its purpose. Each arithmetic operation has its purpose and solves a real problem. In my opinion, we should not limit ourselves to just one solution. We should show a solution for a given category of problems. In my opinion, this change is correct if documentation is added. It should explain the differences between one and the other way and show examples for each method. Airflow is growing. Adding(bitwise operator) is not a solution for all problems. |
@zhongjiajie I'm on the same page as @feng-tao I think having two ways of doing the same thing is bad. I find the |
Hi @zhongjiajie , I agree with @feng-tao and @Fokko . Having multiple ways is a potential confusion. We are encouraging the usage of |
Reopen as Slack discuss https://apache-airflow.slack.com/archives/CCPRP7943/p1554374078016800 |
Codecov Report
@@ Coverage Diff @@
## master #4779 +/- ##
==========================================
+ Coverage 78.93% 78.94% +<.01%
==========================================
Files 480 480
Lines 30129 30141 +12
==========================================
+ Hits 23782 23794 +12
Misses 6347 6347
Continue to review full report at Codecov.
|
I will add some feature we discuss in Slack. |
Looks good @zhongjiajie however I wouldn't bother validating the input, it's not Pythonic and adds too much complexity. |
@bcb Thanks for review, I think some validate is nessarary, becase >>> a = [1,2,3]
>>> b = [4,5]
>>> list(zip(a,b))
[(1, 4), (2, 5)] # make user get the wrong dependence. |
airflow/utils/helpers.py
Outdated
for a, b in zip(task, down_task): | ||
a.set_downstream(b) | ||
else: | ||
raise AirflowException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://docs.python.org/2/library/itertools.html#itertools.product is what you want I think?
# product('ABCD', 'xy') --> Ax Ay Bx By Cx Cy Dx Dy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really, that the cross
mean, we already have cross_downstream
in airflow.utils.helper
.I am inspired in https://apache-airflow.slack.com/archives/CCPRP7943/p1554633832061000?thread_ts=1554374078.016800&cid=CCPRP7943 , thinking we should have a function to make parallel pipelines work
/ -> t1 -> t3
t0
\ -> t2 -> t4
If we do that, chain
could do chain(t0, [t1, t2, t3], [t4, t5, t6])
like
/ -> t1 -> t4
t0 -> t2 -> t5
\ -> t3 -> t6
and could cross_downstream([t1, t2, t3], [t4, t5, t6])
t1 \ / -> t4
\ /
t2 -> X -> t5
/ \
t3 / \ -> t6
make our dependent more possibilities.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @zhongjiajie , it wouldn't make sense to do that.
However I still think you should just zip and join the lists, without checking they're the same size. Some tasks in the bigger list won't be joined to anything - that's fine, the user will realise their mistake.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, product
is totally different behaviour.
I'm not so sure that it would be obvious - the "extra" tasks from the long list would instead end up with no deps set. That sounds like it would be more confusing than the behaviour in the PR: an error would show up in the UI.
But I am not a user of this fn so I can't say for certain.
ci green , please take a look if you have time @potiuk |
I still think adding docs in the same PR is necessary. I am in favour of adding documentation together with the relevant code that is modified. Especially in this case, this is a new way of building the DAG relationships which is the "core" of Airflow really and everyone should know how to use it. Having some PR / slack discussion describing such feature is really bad way of documenting it. Especially that this discussion was long and had many twists so it is unclear what the final intention was without reading the whole thread. Also several other people had their strong opinions here so maybe it's good if everyone who participated will state their opinion on the current code. I don't think we all have to fully agree but maybe just one last round of comments would be great, then documentation should be added to reflect it and then we merge it. @zhongjiajie @feng-tao @Fokko @feluelle @mik-laj @XD-DENG ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code and tests LGTM 👍
+1 on adding docs to this PR. Docs should come always along with a new feature.
@zhongjiajie you want that the feature gets attention - docs would help on that :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add some docs on it
helpers.chain only support list as downstream This PR make list as upstream work, also make list parallel work, which like below / -> t2 -> t4 \ t1 -> -> t6 \ -> t3 -> t5 /
91d1469
to
37c27da
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explain why I change old docs
@@ -185,6 +185,9 @@ Bitshift Composition | |||
|
|||
*Added in Airflow 1.8* | |||
|
|||
We recommend you setting operator relationships with bitshift operators rather than ``set_upstream()`` | |||
and ``set_downstream()``. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add this to let users know that we recommend set relationships using bitshift operators
@@ -248,21 +251,92 @@ Bitshift can also be used with lists. For example: | |||
|
|||
.. code:: python | |||
|
|||
op1 >> [op2, op3] | |||
op1 >> [op2, op3] >> op4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let users know >>
could work on list[operatos]
as upstream
LGTM 👍 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -150,19 +151,50 @@ def as_flattened_list(iterable): | |||
|
|||
|
|||
def chain(*tasks): | |||
""" | |||
r""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this r
for? Is it a typo or a way to render something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both string and bytes literals may optionally be prefixed with a letter 'r' or 'R'; such strings are called raw strings and treat backslashes as literal characters
This makes the \
in the ascii diagram be a backslash. I imagine otherwise it would complain about \
being an invalid escape.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense 👍 Thanks @ashb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks ash for the clarification
Thanks @kaxil for merging and everyone review and suggest. |
Jira
Description
For now on,
airflow.utils.helpers.chain
only support list as downstream, but not upstream. This ticket is to support list as upstream.Only support
task_1 --> task_2 --> task_3 --> task_4 or /--> task_3 task_1 --> task_2 \--> task_4
This improvement to be support
Tests
Commits
Documentation
Code Quality
flake8