Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce TaskMixin #10930

Merged
merged 9 commits into from
Sep 16, 2020
Merged

Introduce TaskMixin #10930

merged 9 commits into from
Sep 16, 2020

Conversation

turbaszek
Copy link
Member

Both BaseOperator and XComArgs implement bit shift operators
used to chain tasks in DAGs. By extracting this logic to new
mixin we reduce code duplication and make it easier to implement
it in the future.

Closes: #10926


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@turbaszek
Copy link
Member Author

@casassg and @yuqian90 you may want to take a look 😉

@turbaszek
Copy link
Member Author

Ok, this one is unexpected:

Too many ancestors (8/7) (too-many-ancestors)

What should we do:
A. Disable it locally in every affected operator
B. Increase limit of ancestors
C. Disable this check in pylintrc, as well as others like limit of arguments in class

Any opinions @mik-laj @kaxil @potiuk @ashb ?

@kaxil
Copy link
Member

kaxil commented Sep 14, 2020

Ok, this one is unexpected:

Too many ancestors (8/7) (too-many-ancestors)

What should we do:
A. Disable it locally in every affected operator
B. Increase limit of ancestors
C. Disable this check in pylintrc, as well as others like limit of arguments in class

Any opinions @mik-laj @kaxil @potiuk @ashb ?

I would say B. increase the limit in pylintrc:

airflow/pylintrc

Lines 551 to 587 in eaa49b2

[DESIGN]
# Maximum number of arguments for function / method.
# BasPH: choose 10 because this was 80% of the sorted list of number of arguments above 5 (Pylint default)
max-args=10
# Maximum number of attributes for a class (see R0902).
# BasPH: choose 15 because this was 80% of the sorted list of number of attributes above 7 (Pylint default)
max-attributes=15
# Maximum number of boolean expressions in an if statement.
max-bool-expr=5
# Maximum number of branch for function / method body.
# BasPH: choose 22 because this was 80% of the sorted list of number of attributes above 12 (Pylint default)
max-branches=22
# Maximum number of locals for function / method body.
# BasPH: choose 24 because this was 80% of the sorted list of number of locals above 15 (Pylint default)
max-locals=24
# Maximum number of parents for a class (see R0901).
max-parents=7
# Maximum number of public methods for a class (see R0904).
# BasPH: choose 27 because this was 50% of the sorted list of 30 number of public methods above 20 (Pylint default)
max-public-methods=27
# Maximum number of return / yield for function / method body.
max-returns=6
# Maximum number of statements in function / method body.
# BasPH: choose 69 because this was 80% of the sorted list of number of statements above 50 (Pylint default)
max-statements=69
# Minimum number of public methods for a class (see R0903).
min-public-methods=0

Currently, max-parents=7 we could increase it to 8

@ashb
Copy link
Member

ashb commented Sep 14, 2020

If vote b too.

"""

@abstractmethod
def set_upstream(self, other):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want set_upstream to handle things such as List[XComArg] or List[TaskGroup]? If so, maybe we should add the typing to indicate other can be Union[TaskMixin, List[TaskMixin]].

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love to add some type hints, but I'm not sure if using TaskMixin is that much informative. But I any other approach will create cyclic imports

Copy link
Contributor

@casassg casassg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! DRY!

@kaxil
Copy link
Member

kaxil commented Sep 14, 2020

Static check is failing

@yuqian90
Copy link
Contributor

Thanks for the PR. This is a great improvement. I think there is room to "DRY" this further. For example, we have a lot of branching logic like this in these methods:

In BaseOperator._set_relatives(), we have this:

        if isinstance(task_or_task_list, XComArg):
            # otherwise we will start to iterate over xcomarg
            # because of the "list" check below
            # with current XComArg.__getitem__ implementation
            task_list = [task_or_task_list.operator]
        ...

When we add TaskGroup, we'll have a few more isinstance checks. We might be able to use some polymorphism to replace most of the if isinstance in these methods. One idea is to introduce a get_roots() and get_leaves() method within TaskMixin.

BaseOperator just returns itself as a list:

class BaseOperator(...):
    def get_roots(self) -> Generator["BaseOperator", None, None]:
        return [self]

    def get_leaves(self) -> Generator["BaseOperator", None, None]:
        return [self]

XComArgs returns the underlying operator:

class XComArg(...):
    def get_roots(self) -> Generator["BaseOperator", None, None]:
        return [self.operator]

    def get_leaves(self) -> Generator["BaseOperator", None, None]:
        return [self.operator]

TaskGroup has its own way of returning leaves or roots:

class TaskGroup(....):
    def get_roots(self) -> Generator["BaseOperator", None, None]:
       ...
    def get_leaves(self) -> Generator["BaseOperator", None, None]:
       ...

I think get_roots() and get_leaves() will be able to replace all the isinstance checks in set_upstream/set_downstream of XComArg, TaskGroup, BaseOperator.

What do you think?

@turbaszek
Copy link
Member Author

I think get_roots() and get_leaves() will be able to replace all the isinstance checks in set_upstream/set_downstream of XComArg, TaskGroup, BaseOperator.

I must admit that I didn't get the get_roots and get_leaves idea fully. However, I implemented the operator property that should return BaseOperator and I think it serves the same purpose. WDYT @yuqian90 ?

@yuqian90
Copy link
Contributor

I think get_roots() and get_leaves() will be able to replace all the isinstance checks in set_upstream/set_downstream of XComArg, TaskGroup, BaseOperator.

I must admit that I didn't get the get_roots and get_leaves idea fully. However, I implemented the operator property that should return BaseOperator and I think it serves the same purpose. WDYT @yuqian90 ?

I see. For singular operators such as XComArg and BaseOperator, having a single operator property would suffice. However, when it comes to a group of operators with their own inter-dependencies, I don't think it's sufficient to return all the operators in the group. For example, when we do this, where operator1 is a singular operator, and group1 is a TaskGroup,

operator1 >> group1

We only want to set all the "roots" in group1 as downstream of operator1. So the operator1.set_downstream() somehow needs to have a way to get all the "roots" of group1. Here, "roots" means all the tasks within group1 that do not have any upstream dependencies within group1.

Same idea applies for "leaves" when we do this. Only the "leaves" in group1 needs to be pulled out and set upstream of operator1.

operator1 << group1

I think what you have at the moment (operator property) is good enough though for this PR's purpose. The "roots" vs "leaves" problem is very specific to TaskGroup. I'm fine with having such logic live within TaskGroup.

@@ -1146,27 +1115,22 @@ def add_only_new(self, item_set: Set[str], item: str) -> None:
else:
item_set.add(item)

@property
def operator(self) -> "BaseOperator":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One request. Is it possible to make this property a List[BaseOperator] instead? If we want TaskGroup to implement TaskMixin, it'll be difficult to decide what operator should be if it's a singular operator.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored it a bit so now its roots property - more in line with your initaill proposition

Both BaseOperator and XComArgs implement bit shift operators
used to chain tasks in DAGs. By extracting this logic to new
mixin we reduce code duplication and make it easier to implement
it in future.

Closes: apache#10926
Copy link
Contributor

@yuqian90 yuqian90 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great. Thank you!

@@ -66,4 +66,4 @@ def print_value(value):
xcom_args_a = print_value("first!") # type: ignore
xcom_args_b = print_value("second!") # type: ignore

bash_op1 >> xcom_args_a >> xcom_args_b >> bash_op2
bash_op1 >> xcom_args_a >> xcom_args_b >> bash_op2 # type: ignore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh ! is MyPy complaining here?

Copy link
Member Author

@turbaszek turbaszek Sep 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it and I would prefer to solve it in other PR as I believe this will require either mypy plugin or some type changes in @task decorator. The original error:

airflow/example_dags/example_xcomargs.py:66: error: Value of type variable "T"
of "print_value" cannot be "str"
        xcom_args_a = print_value("first!")
                      ^
airflow/example_dags/example_xcomargs.py:67: error: Value of type variable "T"
of "print_value" cannot be "str"
        xcom_args_b = print_value("second!")
                      ^
airflow/example_dags/example_xcomargs.py:69: error: Unsupported left operand
type for >> ("BashOperator")
        bash_op1 >> xcom_args_a >> xcom_args_b >> bash_op2

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, fixed? Use @task() instead of @task

Copy link
Member

@kaxil kaxil Sep 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm interesting, any idea here @casassg -- "@ task" should work.

This can be addressed in a separate PR though

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both @task and @task() works, however the type annotation of this decorator seems to be confusing mypy (althought it looks valid to me)

@turbaszek
Copy link
Member Author

The single failing build seems to be a flaky something - @kaxil are we good with merging this PR?

@kaxil
Copy link
Member

kaxil commented Sep 16, 2020

Yup 🚀

@turbaszek turbaszek merged commit 0779688 into apache:master Sep 16, 2020
@turbaszek turbaszek deleted the task-mixin branch September 16, 2020 11:56
yuqian90 added a commit to yuqian90/airflow that referenced this pull request Sep 18, 2020
yuqian90 added a commit to yuqian90/airflow that referenced this pull request Sep 21, 2020
yuqian90 pushed a commit to yuqian90/airflow that referenced this pull request Sep 24, 2020
Both BaseOperator and XComArgs implement bit shift operators
used to chain tasks in DAGs. By extracting this logic to new
mixin we reduce code duplication and make it easier to implement
it in the future. This change introduces also root property that allows us
to reduce number of type checking.

(cherry picked from 0779688)
yuqian90 pushed a commit to yuqian90/airflow that referenced this pull request Jan 11, 2021
Both BaseOperator and XComArgs implement bit shift operators
used to chain tasks in DAGs. By extracting this logic to new
mixin we reduce code duplication and make it easier to implement
it in the future. This change introduces also root property that allows us
to reduce number of type checking.

(cherry picked from 0779688)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Introduce TaskMixin
5 participants