Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8057] [AIP-31] Add @task decorator #5

Closed
wants to merge 28 commits into from
Closed

Conversation

casassg
Copy link
Collaborator

@casassg casassg commented Apr 26, 2020

Airflow AIP-31 task decorator implementation. This decorator should facilitate wrapping a function into an operator and use it as such. Closes airflow#8057 + airflow#8056.

  • Should be used without args or with args/kwargs for the underlying operator:
@task
def simple_task(...):

@task(dag=dag)
def simple_task(...):
  • Task ID should be the function name by default.
  • Should wrap the function so that autocomplete can still suggest parameters correctly.
  • Decorator should return an instance of PythonFunctionalOperator. This can be used to set task dependencies. Ex:
@task 
def simple_task(...)
  pass

simple_task >> another_task

Make sure to mark the boxes below before creating PR: [x]

  • Description above provides context of the change
  • Unit tests coverage for changes (not needed for documentation changes)
  • Target Github ISSUE in description if exists
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.


ToDo:

@casassg casassg mentioned this pull request Apr 27, 2020
6 tasks
self.multiple_outputs = multiple_outputs


def __call__(self, *args, **kwargs):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably this is the main discussion for this PR: @task => Operator instance approach
A. we need to make sure that the user doesn't use the same operator twice:

@task
def some_task(a):
       pass

a = some_task(1)
b = some_task(2).  #second time usage with different param - Error, it will change first call as well

Solution: error message on more than one call and make sure user calls .alias on every extra call.

B. if dag.task decorator is not used, the operator has to be assigned to dag.
if user writes some_task() in dag scope, he probably expects it to be assigned to current with DAG context, ( if @task is defined inside the context, it will happen automatically), but usually, people will not like to define functions inside with DAG context.
C. right now a user is not able to change the Operator configuration around some function ( BaseOperator has 20 different params, so it's not something hypothetical: pool, start_day and others
Solution: something similar to .alias with copy ctor that can modify specific kwargs.

I am probably too biased to suggest some specific approach here. I like current @task implementation as it keeps everything very simple, at the same time "@task as factory" approach can be much more flexible in the future.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. This definetly needs some extra work. I want to make sure we are all aligned in the conversation around task -> PythonFunctionalOperator vs task -> factory of PythonFunctionalOperator

A. Totally agree, I can add this.
B. We can get DAG from DAGContext similarly as its done in initialization on __call__ method if operator is not assigned and assign it.
C. True, we need to capture the *args, **kwargs in the beginning and pass it to the copy as well. (or maybe copy all class attributes? not sure what's the cleanest approach here).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

B. this is exactly what is missing.
C. probably there is no "cleanest" one, we will have to compromise. so whatever you do, it will work.

# return XComArg(self)

def alias(self, task_id: str):
return PythonFunctionalOperator(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we create a new operator, it probably should copy all kwargs from the current one )
we can add copy function via standard approach or some hacky one via .copy and params modification.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a stab at this. Maybe its a bit too hacky? Also, need to test this, not sure if it works.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@turbaszek what do you think?
maybe we can just use def task_id(task_id), so the code will look like
some_op.task_id("new_task_id"). (clear for airflow users, they are used to task_id)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we go this way I am in favor of .task_id("xxx"). However I would still consider autogeneration of task_ids

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we have to do both.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an idea to generate it by default (and allow custom task_id). Let me write that 😄

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check __init__ method.

Comment on lines 187 to 201
self._op_args = None
self._op_kwargs = None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add those to constructor?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the intended usage of the operator is with the __call__ I would say we can keep those out from there. We have PythonOperator to serve those needs. Not strongly biased though, I thought it made sense to hide them a bit and constraint the usage of the operator a bit more.

I might even wonder if we should make this a private class instead so that it's only used with the task decorator. Thoughts?

@casassg casassg changed the title Feature/task decorator [AIP-31] Add @task decorator May 1, 2020
@casassg casassg changed the title [AIP-31] Add @task decorator [8057] [AIP-31] Add @task decorator May 1, 2020
@turbaszek
Copy link

@casassg XComArg is now present in apache/master, can you please rebase? 🚀

return_dict(test_number)

dr = self.dag.create_dagrun(
run_id="manual__",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use DagRunType

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional info: apache#8227 (comment)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I rebase on top of your internal branch? Otherwise I can refactor once the PR lands. Leaving this open and adding as todo

@casassg casassg force-pushed the feature/task_decorator branch 4 times, most recently from 218bc24 to afde96c Compare May 20, 2020 23:49
@casassg casassg marked this pull request as ready for review May 20, 2020 23:58
kaxil and others added 3 commits May 21, 2020 01:31
Debian Buster only ships with a JDK11, and Hive/Hadoop fails in odd,
hard to debug ways (complains about metastore not being initalized,
possibly related to the class loader issues.)

Until we rip Hive out from the CI (replacing it with Hadoop in a seprate
integration, only on for some builds) we'll have to stick with JRE8

Our previous approach of installing openjdk-8 from Sid/Unstable started
failing as Debian Sid has a new (and conflicting) version of GCC/libc.
The adoptopenjdk package archive is designed for Buster so should be
more resilient
@turbaszek
Copy link

There's one test failing:

______________________ TestAirflowTask.test_airflow_task _______________________
322

323
self = <tests.operators.test_python.TestAirflowTask testMethod=test_airflow_task>
324

325
    def test_airflow_task(self):
326
        """Tests airflow.task decorator to generate task"""
327
        from airflow import task
328
    
329
        @task
330
>       def add_2(number: int):
331
E       TypeError: 'module' object is not callable
332

333
tests/operators/test_python.py:619: TypeError

Otherwise LGTM, let's move to apache repo

kaxil and others added 3 commits May 21, 2020 10:51
…pache#8910)

Currently there is no way to determine the state of a TaskInstance in the graph view or tree view for people with colour blindness

Approximately 4.5% of people experience some form of colour vision deficiency
kaxil and others added 4 commits May 21, 2020 12:17
All PRs will used cached "latest good" version of the python
base images from our GitHub registry. The python versions in
the Github Registry will only get updated after a master
build (which pulls latest Python image from DockerHub) builds
and passes test correctly.

This is to avoid problems that we had recently with Python
patchlevel releases breaking our Docker builds.
…c dag (apache#8952)

The scheduler_dag_execution_timing script wants to run _n_ dag runs to
completion. However since the start date of those dags is Dynamic (`now
- delta`) we can't pre-compute the execution_dates like we were before.
(This is because the execution_date of the very first dag run would be
`now()` of the parser process, but if we try to pre-compute that in
the benchmark process it would see a different value of now().)

This PR changes it to instead watch for the first _n_ dag runs to be
completed. This should make it work with more dags with less changes to
them.
@casassg casassg changed the title [8057] [AIP-31] Add @task decorator [8057] [AIP-31] Add @operator decorator May 21, 2020
@casassg
Copy link
Collaborator Author

casassg commented May 21, 2020

Didn't realize but it seems that airflow.task already exists as a module. There's 2 paths forward:

  • Change name of decorator from task to operator [Pushed this for now]
  • Disable importing from airflow import task. We can move it to something like from airflow.decorators import task

Thoughts?

@casassg casassg force-pushed the feature/task_decorator branch 3 times, most recently from d8cbd8a to c9a401f Compare May 21, 2020 19:55
dimberman and others added 3 commits May 21, 2020 13:20
* Push CI images to Docker packcage cache for v1-10 branches

This is done as a commit to master so that we can keep the two branches
in sync

Co-Authored-By: Ash Berlin-Taylor <ash_github@firemirror.com>

* Run Github Actions against v1-10-stable too

Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com>
`field_path` was renamed to `tag_template_field_path` in >=0.8 and there might be other unknown errors
@casassg casassg changed the title [8057] [AIP-31] Add @operator decorator [8057] [AIP-31] Add @task decorator May 21, 2020
add return XComArg
…__call__

handle empty dict/list in sequencial calls

revert unnecessary dependency addition
s/Exception/AirflowException

make pythonfunctionaloperator a private class

missing documentation
remove docs

remove linter errors
migrate to pytest

use DagRunType and fix small issue w XComArg

some more pylint fixes

pylint and isort issues

add task into STATICA_HACK

remove cyclic dependency

disable pylint nomember for .key
remove unneded line

address mypy issues

fix more pylint mypy issues

fix docs

add _called variable

change task decorator name to operator
@casassg
Copy link
Collaborator Author

casassg commented May 22, 2020

Closing this for apache#8962

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

10 participants