Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-8057] [AIP-31] Add @task decorator #8962

Merged
merged 43 commits into from
Jun 23, 2020

Conversation

casassg
Copy link
Contributor

@casassg casassg commented May 21, 2020

Airflow AIP-31 task decorator implementation. This decorator should facilitate wrapping a function into an operator and use it as such in a DAG. Closes #8057. Closes #8056.

  • Should be used without args or with args/kwargs for the underlying operator:
@task
def simple_task(...):

@operator(dag=dag)
def simple_task(...):
  • Task ID should be the function name by default.
  • Decorator should return an instance of PythonFunctionalOperator. This can be used to set task dependencies. Ex:
@task 
def simple_task(...)
  pass

simple_task >> another_task
  • Adding documentation on @task and functional style dags in concepts.rst

Work done in collaboration from @turbaszek and @evgenyshulman


Make sure to mark the boxes below before creating PR: [x]

  • Description above provides context of the change
  • Unit tests coverage for changes (not needed for documentation changes)
  • Target Github ISSUE in description if exists
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

@boring-cyborg
Copy link

boring-cyborg bot commented May 21, 2020

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, pylint and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://apache-airflow-slack.herokuapp.com/

Copy link
Contributor

@aoen aoen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow what a first commit :). I think mine was a 1 line bug fix.

@@ -83,7 +83,7 @@ def __getitem__(self, item):
"""
Implements xcomresult['some_result_key']
"""
return XComArg(operator=self.operator, key=item)
return XComArg(operator=self.operator, key=str(item))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the cast? Seems being explicit about the type received here might be safer, and the cast can happen on the caller side, otherwise this weakens type safety of this arg for all XCOM use cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Key will always be a string as it's casted when pushing the field to the XCom DB. This is just to make things easier to use when doing multiple_outputs also to ensure consistency. This allows you to do:

res[1]

Which in reality is more transparent.

It was a change we missed in #8652

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance you could pull this (and the test for it) out to a separate PR please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related discussion: databand-ai#5 (comment)

This small cast makes the multiple_output piece work more reliably and more transparently to the user. Not actually a new feature, but mostly a fix of what already got merged in XComArg.

I can break it into a separate PR but note that XCom class already does this when saving the key (not when retrieving it)

airflow/operators/python.py Show resolved Hide resolved
) -> None:
# Check if we need to generate a new task_id
task_id = kwargs.get('task_id', None)
dag = kwargs.get('dag', None) or DagContext.get_current_dag()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this function play nicely if neither side of the or returns true and task_id is None, e.g. if someone initializes a task and then adds it to a DAG later?

I guess it doesn't really make sense with this pattern so specify dag_id later, so I think raising an exception if DAG is not specified would be reasonable (+accompanying unit test).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DAG can be not specified when initialized outside of DAG context. And you want to make it to bind when you call it. If you manually assign it the DAG, then it wont work thats true. Not sure if I should check this here.

) -> None:
# Check if we need to generate a new task_id
task_id = kwargs.get('task_id', None)
dag = kwargs.get('dag', None) or DagContext.get_current_dag()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why default 'dag'/task_id to None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dag -> because we may declare the task outside without an explicit dag (and we want to fallback to current_dag then.
task_id -> not really any reason. can switch it.

airflow/operators/python.py Outdated Show resolved Hide resolved
airflow/operators/python.py Outdated Show resolved Hide resolved
airflow/operators/python.py Outdated Show resolved Hide resolved
super().__init__(*args, **kwargs)
self.python_callable = python_callable
self.multiple_outputs = multiple_outputs
self._kwargs = kwargs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does the use of kwargs relate to their deprecation in Airflow 2.0:
e.g. in the BaseOperator code:


              if args or kwargs:
            # TODO remove *args and **kwargs in Airflow 2.0
            warnings.warn(
                'Invalid arguments were passed to {c} (task_id: {t}). '
                'Support for passing such arguments will be dropped in '
                'Airflow 2.0. Invalid arguments were:'
                '\n*args: {a}\n**kwargs: {k}'.format(
                    c=self.__class__.__name__, a=args, k=kwargs, t=task_id),
                category=PendingDeprecationWarning,
                stacklevel=3
            )

Does this need a deprecation warning too? Should we just not allow kwargs/args here in the first place, or is it needed for backwards compatibility?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This already calls BaseOperator.__init__ so guessing it will already give you a warning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just drop the kwargs/etc support in the new operator though since it's deprecated?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that would be better. +1, for dropping it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean here. How then is the operator supposed to pass down kwargs like owner that is a BaseOperator valid kwarg? If I remove kwargs here, then I won't be able to set owner in this operator.


def __call__(self, *args, **kwargs):
# If args/kwargs are set, then operator has been called. Raise exception
if self._called:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why the functions can't be reused seems a bit annoying for users, wonder if we can fix this (e.g. late-binding the task_ids or something...) or add a TODO, might be worth adding a comment here. ._checking if it was already called feels a bit hacky as it kind of couples the task execution with global state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly is bc of 1 function == 1 operator. The alternative was the idea that @evgenyshulman proposed which is to use functions as operators generators. This simplifies a bit, but also then you can't use the change the operator later on (it's never accessible in the DAG file itself). I've seen both approaches take here (either 1to1 or 1tomany). For me I like better the 1to1, but mainly bc it allows you to use it as you would use any operator later and feels a bit more intuitive.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a exeuction time error, or does this happen at parse time?

We don't appear to have any tests that cover this (or I missed it). Can you add some, and add a section about this to the docs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I correctly understand that this will not work?

@task 
def update_user(user_id: str):
    ...

with DAG(...):
    # Fetch list of users 
    ...
    # Execute task for each user
    for user_id in users_list:
        update_user(user_id)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will fail at parse time.

Correct. 1 function == 1 operator. Airflow doesn't allow dynamic operators (execute 1 operator several times). You can still work around it.

This will work though:

@task 
def update_user(user_id: str):
    ...

with DAG(...):
    # Fetch list of users 
    ...
    # Execute task for each user
    for user_id in users_list:
        update_user.copy(f'update_{user_id}')(user_id)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I have mixed feelings:

  • copy is not self-explanatory in this case imho
  • in such case, shouldn't we generate auto id? Or at least can we try to do update_user(user_id, task_id=user_id)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with turbaszek. This is not self-explanatory at all. I think having update_user(user_id, task_id=...) would be much better. We can access the function signature inside task. This should work and is more self-explanatory.

Copy link
Contributor Author

@casassg casassg May 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main worry is that then what is update_user. What you are describing here is using update_user as an operator factory. It has it's value, but it also feels too magic to me atm. If update_user is a factory, then you can't change the operator instance at all or use it to set non-data dependencies.

We could capture task_id kwarg and generate a new operator, but then what is update_user the first operator, the latest one? What does update_user represent?

You can either do (1) update_user(i) for i in range(20) or (2) update_user >> other_operation, but not both. I prefer to support 2nd option as it adapts more to what Airflow already does with operators.

Copy link
Member

@turbaszek turbaszek May 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could capture task_id kwarg and generate a new operator, but then what is update_user the first operator, the latest one? What does update_user represent?

For me update_user is a function and as a function it can be called many times with different input thus yielding different results (here creating new task). I have never meet "function as a singleton" pattern. If we don't want to generate task_id for users then we may consider raising an exception on second invocation when no custom task_id is passed.

My point is: this is a function, I expect to be able to call it as many time as I wish. I expect Airflow to treat each call of this function (in proper context) as creating a new task.

You can either do (1) update_user(i) for i in range(20) or (2) update_user >> other_operation, but not both. I prefer to support 2nd option as it adapts more to what Airflow already does with operators.

Why should I not be able to do this? This is something that I saw many times.

first_task = BashOperator()
last_task = BashOperator()

for user_id in users_list:
   first_task >> update_user(user_id) >> last_task

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this use case is exactly what we want to support.

The issue being we'd have to generate a new task for each one (doable, that isn't a problem, we just need to work out what task_id to call it).

Perhaps taking a leaf out of pytest or https://pypi.org/project/parameterized/ for how they name functions/test cases?

raise AirflowException('@task decorated functions can only be called once. If you need to reuse '
'it several times in a DAG, use the `copy` method.')

# If we have no DAG, reinitialize class to capture DAGContext and DAG default args.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not great that this and potential parse-time errors that could occur here is moved to runtime instead of DAG parse time, another reason it might be worth thinking about a more parse-time friendly solution if possible (or what's stopping Airflow from supporting this at the current time).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to other options. This seemed the cleanest one. We are basically calling init again so that we capture default_args from the dag. The other option is to manually implement default_args here.

Also note that we will get parse errors in the declaration if there's any. So this will be for default_args itself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What use case/code path is this enabling?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defining a task decorated operator without a DAG and adding it to the DAG on __call__.

@taks 
def add_2(num)
  return num+2

with DAG(...):
   add_2(2)

Otherwise this does not work. Also if we define default_args in DAG we wont be able to capture it either.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh gotcha, that is a good workflow to enable.

Is this covered in tests? (Sorry, finding it hard to keep track of all the discussions at the moment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same tbh. I think given the discussion on calling a decorated function several times, I'll refactor to accomodate that. This should simplify code a bit as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, not sure if we should use the same approach from parametrized, as you may want to run an operation twice with the same args/kwargs. Also, if the arg/kwarg is a XComArg, what should we do?

@turbaszek turbaszek added the AIP-31 Task Flow API for nicer DAG definition label May 22, 2020
airflow/operators/python.py Outdated Show resolved Hide resolved

def __call__(self, *args, **kwargs):
# If args/kwargs are set, then operator has been called. Raise exception
if self._called:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I correctly understand that this will not work?

@task 
def update_user(user_id: str):
    ...

with DAG(...):
    # Fetch list of users 
    ...
    # Execute task for each user
    for user_id in users_list:
        update_user(user_id)

airflow/operators/python.py Show resolved Hide resolved
airflow/operators/python.py Outdated Show resolved Hide resolved
airflow/ti_deps/deps/trigger_rule_dep.py Show resolved Hide resolved
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be ace!

airflow/operators/python.py Outdated Show resolved Hide resolved
raise AirflowException('@task decorated functions can only be called once. If you need to reuse '
'it several times in a DAG, use the `copy` method.')

# If we have no DAG, reinitialize class to capture DAGContext and DAG default args.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What use case/code path is this enabling?

airflow/operators/python.py Outdated Show resolved Hide resolved

def __call__(self, *args, **kwargs):
# If args/kwargs are set, then operator has been called. Raise exception
if self._called:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a exeuction time error, or does this happen at parse time?

We don't appear to have any tests that cover this (or I missed it). Can you add some, and add a section about this to the docs.

airflow/operators/python.py Show resolved Hide resolved
Comment on lines 438 to 443
do_run()
assert ['do_run'] == self.dag.task_ids
do_run_1 = do_run.copy()
do_run_2 = do_run.copy()
assert do_run_1.task_id == 'do_run__1'
assert do_run_2.task_id == 'do_run__2'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
do_run()
assert ['do_run'] == self.dag.task_ids
do_run_1 = do_run.copy()
do_run_2 = do_run.copy()
assert do_run_1.task_id == 'do_run__1'
assert do_run_2.task_id == 'do_run__2'
run = do_run()
do_run_1 = do_run.copy()
do_run_2 = do_run.copy()
assert ['do_run', 'do_run__1', 'do_run__2'] == self.dag.task_ids
assert run.task_id == 'do_run'
assert do_run_1.task_id == 'do_run__1'
assert do_run_2.task_id == 'do_run__2'

(I think)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly, except:

assert do_run.task_id == 'do_run'

run here is an XComArg representing the value returned in the do_run operator.

def return_dict(number: int):
return {
'number': number + 1,
43: 43
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should test (and work out what we want) when you do return { 43: 43, '43': 42 }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

XCom automatically casts keys to strings. That's the reason for adding casting in XComArg/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I mean we should define, and test, what we want the behaviour to be in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh I see, sorry did not fully understand the issue. Not sure what the best path for this may be. We can raise an exception (not great) or log weird usage.

tests/operators/test_python.py Show resolved Hide resolved
tests/operators/test_python.py Outdated Show resolved Hide resolved
tests/operators/test_python.py Outdated Show resolved Hide resolved
@ashb
Copy link
Member

ashb commented May 22, 2020

Do copy what I put in slack, Thanks to Stackoverflow I've got a way of having from airflow import task be callable, and still work as a submodule:

diff --git airflow/task/__init__.py airflow/task/__init__.py
index 114d189da..0b15c9a8c 100644
--- airflow/task/__init__.py
+++ airflow/task/__init__.py
@@ -16,3 +16,20 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
+import sys
+import types
+
+
+class CallableModule(types.ModuleType):
+    def __init__(self):
+        types.ModuleType.__init__(self, __name__)
+        self.__dict__.update(sys.modules[__name__].__dict__)
+
+    __all__ = list(set(vars().keys()) - {'__qualname__'})   # for python 2 and 3
+
+    def __call__(self, *args, **kwargs):
+        from airlfow.decorators import task
+        return task(*args, **kwargs)
+
+sys.modules[__name__] = CallableModule()

This works in Py 2.7, 3.7 and 3.8 (versions I have easy access to).

The main question is: do we want to support that? Is this "hack" worth it, and do IDEs get massively confused by this?

@ashb
Copy link
Member

ashb commented May 22, 2020

The main question is: do we want to support that? Is this "hack" worth it, and do IDEs get massively confused by this?

I think if IDEs support it, and we add tests for it I'd say, yes.

I wonder if we can do this in airflow/__init__.py (adding the task decorator import)

STATICA_HACK = True
globals()['kcah_acitats'[::-1].upper()] = False
if STATICA_HACK:  # pragma: no cover
    from airflow.models.dag import DAG
    from airflow.exceptions import AirflowException
    from airflow.decorators import task

Without confusing IDEs more.

@casassg
Copy link
Contributor Author

casassg commented May 22, 2020

I prefer to avoid doing that. Seems to add quite a bit of code complexity and I doubt it will impact significantly the user experience. I think it should be fine from users to use airflow.decorators.task

super().__init__(*args, **kwargs)
self.python_callable = python_callable
self.multiple_outputs = multiple_outputs
self._kwargs = kwargs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that would be better. +1, for dropping it.


def __call__(self, *args, **kwargs):
# If args/kwargs are set, then operator has been called. Raise exception
if self._called:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with turbaszek. This is not self-explanatory at all. I think having update_user(user_id, task_id=...) would be much better. We can access the function signature inside task. This should work and is more self-explanatory.

airflow/operators/python.py Show resolved Hide resolved
docs/concepts.rst Show resolved Hide resolved
Comment on lines 186 to 222
@staticmethod
def _get_unique_task_id(task_id: str, dag: Optional[DAG]) -> str:
dag = dag or DagContext.get_current_dag()
if not dag or task_id not in dag.task_ids:
return task_id
core = re.split(r'__\d+$', task_id)[0]
suffixes = sorted(
[int(re.split(r'^.+__', task_id)[1])
for task_id in dag.task_ids
if re.match(rf'^{core}__\d+$', task_id)]
)
if not suffixes:
return f'{core}__1'
return f'{core}__{suffixes[-1] + 1}'
Copy link
Member

@feluelle feluelle May 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are using F-Strings here, too - not only in examples.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also use type annotations which is py3 only. We should probs decide if we want to support Python2 or not at all.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use f-strings

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we can use f-strings here, and manually fix it up when backporting -- it's not too much work on the release-manager/who ever does the backport.

When it comes time to backport this I can run someone through the process (or better yet, I should document this process)

@casassg
Copy link
Contributor Author

casassg commented May 28, 2020

  1. Switched to use factory method (aka reuse function several times in a DAG). Cleaned a lot of code that had to do with this decision.
  2. Added check for multiple outputs keys. Now if any key is not a string, it will fail. This reduces weird cases where users may accidentally duplicate keys when casted to strings. Less flexible but also easier to see what's happenning below on the XCom table.
  3. Reverted string casting in XComArg as we no longer need as we are enforcing keys to always be strings

airflow/operators/python.py Outdated Show resolved Hide resolved
airflow/operators/python.py Outdated Show resolved Hide resolved
airflow/operators/python.py Outdated Show resolved Hide resolved
airflow/operators/python.py Show resolved Hide resolved
Comment on lines 186 to 222
@staticmethod
def _get_unique_task_id(task_id: str, dag: Optional[DAG]) -> str:
dag = dag or DagContext.get_current_dag()
if not dag or task_id not in dag.task_ids:
return task_id
core = re.split(r'__\d+$', task_id)[0]
suffixes = sorted(
[int(re.split(r'^.+__', task_id)[1])
for task_id in dag.task_ids
if re.match(rf'^{core}__\d+$', task_id)]
)
if not suffixes:
return f'{core}__1'
return f'{core}__{suffixes[-1] + 1}'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use f-strings

airflow/operators/python.py Outdated Show resolved Hide resolved
"""
Python operator decorator. Wraps a function into an Airflow operator.
Accepts kwargs for operator kwarg. Will try to wrap operator into DAG at declaration or
on function invocation. Use alias to reuse function in the DAG.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still valid?

Comment on lines 186 to 188
kwargs['task_id'] = self._get_unique_task_id(kwargs['task_id'], kwargs.get('dag', None))
super().__init__(**kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
kwargs['task_id'] = self._get_unique_task_id(kwargs['task_id'], kwargs.get('dag', None))
super().__init__(**kwargs)
super().__init__(**kwargs)
self.task_id = self._get_unique_task_id(kwargs['task_id'], kwargs.get('dag', None))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this preferred?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong opinion here, I just think this is more explicit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm, seems this proposed change fails due to task_id being checked if repeated on super().__init__(**kwargs)

Resolving.

Copy link
Member

@turbaszek turbaszek Jun 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see another issue here. Currently, if task_id is not provided user will get KeyError: 'task_id' instead of TypeError: __init__() missing 1 required positional argument: 'task_id'

Also, this seems to work as expected:

In [8]: class CustomOp(BaseOperator):
   ...:     def __init__(self, a, b, *args, **kwargs):
   ...:         super().__init__(*args, **kwargs)
   ...:         self.task_id = "other task id"
   ...:

In [9]: op = CustomOp(a=1, b=2, task_id="task_id")

In [10]: op.task_id
Out[10]: 'other task id'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_PythonFunctionalOperator is a private operator. Aka it should only be used with @task which does always set the task_id.

Will make the field mandatory just in case.

airflow/operators/python.py Show resolved Hide resolved
airflow/operators/python.py Show resolved Hide resolved
@turbaszek
Copy link
Member

Hi @casassg is there any way we can help you with moving on? 😉

@casassg
Copy link
Contributor Author

casassg commented Jun 3, 2020

Mostly adding more hours to my day. Sorry, been a bit busy this week. Will try to address most comments

@casassg casassg force-pushed the feature/task_decorator branch 2 times, most recently from 364243d to ca98528 Compare June 10, 2020 00:38
@turbaszek
Copy link
Member

@casassg flake8 is sad, if you wish I can fix it

@casassg
Copy link
Contributor Author

casassg commented Jun 10, 2020

I think I fixed it w latest commit. feel free to push a commit fixing it if it does not. Ran pre-commit locally and it passed

@turbaszek turbaszek requested review from feluelle, aoen and ashb June 11, 2020 09:06
@turbaszek
Copy link
Member

Two small comments, otherwise it looks really good! I hope we will be able to merge it soon 🚀 It would be good to have another look from someone else. @dimberman @feluelle @kaxil @mik-laj WDYT?

casassg and others added 16 commits June 23, 2020 12:02
Co-authored-by: Tomek Urbaszek <turbaszek@gmail.com>
Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com>
Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com>
Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
Co-authored-by: Felix Uellendall <feluelle@users.noreply.github.com>
@casassg
Copy link
Contributor Author

casassg commented Jun 23, 2020

Rebased from latest master to see if integration tests are fixed.

@kaxil kaxil merged commit 23faab5 into apache:master Jun 23, 2020
@boring-cyborg
Copy link

boring-cyborg bot commented Jun 23, 2020

Awesome work, congrats on your first merged pull request!

@kaxil
Copy link
Member

kaxil commented Jun 23, 2020

Great work @casassg 🎉

@dimberman
Copy link
Contributor

This is gonna be awesome! Thank you @casassg !

@casassg
Copy link
Contributor Author

casassg commented Jun 23, 2020

Yay! Thanks everyone for the patience and through review 🎉

@casassg casassg deleted the feature/task_decorator branch June 23, 2020 22:21
@potiuk
Copy link
Member

potiuk commented Jun 24, 2020

Just in time for the Summit !

@turbaszek
Copy link
Member

@casassg thanks for your work! 🚀

@evgenyshulman
Copy link
Contributor

@casassg great initiative and awesome implementation!

kaxil pushed a commit to kaxil/airflow that referenced this pull request Jun 27, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AIP-31 Task Flow API for nicer DAG definition
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[AIP-31] Create @task decorator for functionally defined operators [AIP-31] Create PythonFunctionalOperator
9 participants