-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Edgemodifier refactoring w/ labels in TaskGroup edge case #29410
Conversation
6156955
to
300b4a5
Compare
@pierrejeambrun , Elad mentioned I could reach out to you for some feedback. I'm hoping to refactor my logic in |
300b4a5
to
7ae4368
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is a good idea to add the test case you mentioned with TaskGroup + XComArg cycle detection. Josh added a few tests related to this same issue here #23300 and we can definitely add more.
Overall looking good to me, but I am not very familiar with our task dependency management code. Maybe someone more knowledgeable here can take a look. @josh-fell or @uranusjr for instance ?
Can you rebase the branch to re-trigger the CI. It seems that a few jobs were cancelled.
7ae4368
to
fec9ea5
Compare
Sure @pierrejeambrun . I've added this test case (test_multiple_task_groups_dag and test_multiple_task_groups_reversed_dag) and updated the table above with it (multiple_task_groups and multiple_task_groups_reversed). |
fec9ea5
to
6ecbdf7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work @vchiapaikeo! Indeed a bit of a tangled web to manage here, but looks good to me. I pulled down the branch and ran some other combos against it too.
A couple things, I may have missed it in the tests, but it would be great to solidify a test for XComArg dependencies in TaskGroups with a label between them too (i.e. TaskFlowFunc1 >> SomeLabel >> TaskFlowFunc2[<consuming XComArg from TaskFlowFunc1]). Also, since tests are net-new, let's not use "dummy" in the task IDs to adhere to our inclusive language use.
6ecbdf7
to
678a2b2
Compare
Ah good catch @josh-fell - added this and updated the screenshots in the description. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
@pierrejeambrun any further concerns?
LGTM! |
* Edgemodifier refactoring w/ labels in TaskGroup edge case (cherry picked from commit 4b05468)
* Edgemodifier refactoring w/ labels in TaskGroup edge case (cherry picked from commit 4b05468)
closes: #17469
This PR attempts to correct the issues in #21404 which was reverted.
Much of the logic is the same - however, there is one edge case that we catch - when nodes within an EdgeModifier's streams are in the same TaskGroup. When this happens, we should not be converting the nodes in an EdgeModifier's streams to their respective TaskGroups because that will cause a cycle.
I'll admit that the logic in
_convert_streams_to_task_groups
is a bit convoluted and can probably be refactored further. The idea here is to try to identify if the nodes within the streams are part of the same TaskGroup. If they are, we choose not to convert them to TaskGroups. Otherwise, we will convert them to TaskGroups.I'm looking for feedback in
_convert_streams_to_task_groups
. I'm hoping there's a better way to write this logic.Test Cases
I copied the same unit tests from #21404. They are pretty robust. I also ran all the test DAGs from this PR as well and saw that they worked as expected. I added a few test dags of my own to capture the case where a label is created inside a TaskGroup and the XComArg case. In a later commit, I can add these other test dags as well to the unit tests showing that a cycle is no longer detected.
Labels Test DAGs: labels_test_dags.zip
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.