Skip to content

[AIRFLOW-862] Add DaskExecutor#2067

Closed
jlowin wants to merge 1 commit intoapache:masterfrom
jlowin:dask-executor
Closed

[AIRFLOW-862] Add DaskExecutor#2067
jlowin wants to merge 1 commit intoapache:masterfrom
jlowin:dask-executor

Conversation

@jlowin
Copy link
Copy Markdown
Member

@jlowin jlowin commented Feb 11, 2017

Dear Airflow Maintainers,

Please accept this PR that addresses the following issues:

Description:
The Dask Distributed subproject makes it incredibly easy to create clusters of Python workers. Distributed is pure-Python, doesn't require an external database, has a built-in monitoring UI, and can be run anywhere from a laptop to thousands of networked cores. This Executor allows Airflow to execute tasks in a Dask cluster.

To quickly get started with a cluster, see the instructions in the Airflow configuration docs.

Testing Done:

  • Added unit tests to backfill example dags with the new executor

@jlowin jlowin force-pushed the dask-executor branch 2 times, most recently from 420f8cb to e4530c9 Compare February 11, 2017 04:52
@codecov-io
Copy link
Copy Markdown

codecov-io commented Feb 11, 2017

Codecov Report

Merging #2067 into master will decrease coverage by -0.55%.
The diff coverage is 0%.

@@            Coverage Diff            @@
##           master   #2067      +/-   ##
=========================================
- Coverage   67.54%     67%   -0.55%     
=========================================
  Files         139     140       +1     
  Lines       10581   10625      +44     
=========================================
- Hits         7147    7119      -28     
- Misses       3434    3506      +72
Impacted Files Coverage Δ
airflow/configuration.py 84.42% <ø> (ø)
airflow/executors/dask_executor.py 0% <ø> (ø)
airflow/executors/init.py 58.06% <ø> (-9.68%)
airflow/executors/celery_executor.py 0% <ø> (-45%)
airflow/www/views.py 69.7% <ø> (ø)
airflow/models.py 86.16% <ø> (+0.05%)
airflow/jobs.py 73.01% <ø> (+0.1%)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update bbfd43d...2dddb2a. Read the comment docs.

@jlowin jlowin force-pushed the dask-executor branch 2 times, most recently from d9c00a0 to 0e0cd7e Compare February 11, 2017 23:14
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we handle this the same as the MesosExecutor (CeleryExecutor should be updated as well), ie. make it dependent one the configuration if we should load the module and not dependent whether it is available.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add some real unit tests (these are integration tests) that test the functions and their expected input/outputs.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests for both the Executor functions and also the process of manually stepping through running TaskInstances (simulating a job)

Copy link
Copy Markdown
Contributor

@bolkedebruin bolkedebruin Feb 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Dask not support queues? Then this should be noted in the documentation and it should be logged as a warning if the queue is anything different from the default.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to issue a warning

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. Please also add it to the documentation.

@jlowin
Copy link
Copy Markdown
Member Author

jlowin commented Feb 12, 2017

@bolkedebruin thanks for the comments -- I've updated the PR, let me know if it's in line.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that required? this way it still "breathes" integration test, rather than unit test.

Copy link
Copy Markdown
Member Author

@jlowin jlowin Feb 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hear you -- here are the full details:

  1. The function is being executed in the cluster, so if we proceed immediately it might not have time to complete
  2. We could check the future in a while loop until it completes, but we'd have to wrap that in a timeout just in case there is an error communicating with the cluster
  3. Therefore, my thought was to skip the while loop and just block for a moment to see if the future completes.

However for completeness I'll put that in the code, better to be explicit! One more commit coming.

@bolkedebruin
Copy link
Copy Markdown
Contributor

I'm still a bit picky on the unit tests rest apart from doc update LGTM

@jlowin jlowin force-pushed the dask-executor branch 2 times, most recently from 0a3ad80 to b920d5e Compare February 12, 2017 16:23
Adds a DaskExecutor for running Airflow tasks
in Dask clusters.
@bolkedebruin
Copy link
Copy Markdown
Contributor

@jlowin I just noticed that codecov reported a decreased coverage and travis is reporting this

test_backfill_integration (tests.DaskExecutorTest) ... SKIP: Dask unsupported by this configuration
test_dask_executor_functions (tests.DaskExecutorTest) ... SKIP: Dask unsupported by this configuration
test_submit_task_instance_to_dask_cluster (tests.DaskExecutorTest) ... SKIP: Dask unsupported by this configuration

Can you fix that please?

@jlowin
Copy link
Copy Markdown
Member Author

jlowin commented Feb 13, 2017

Shoot -- it was the change we made to executors.__init__.py that removed DaskExecutor from being imported unless airflow.cfg is set to use dask. That means that the line from airflow.executors import DaskExecutor fails in the unit tests but then trips my logic to skip the test if Dask isn't installed.

Submitting a fix immediately.

@jlowin
Copy link
Copy Markdown
Member Author

jlowin commented Feb 13, 2017

The coverage issue appears to be from the same change, which moved the import statements for "non-standard" executors from the top of the file to inside a block that firsts checks airflow.cfg. I think what this means is that there have just never been unit tests for the Celery Executor, so it never gets imported (and therefore coverage drops)

@jlowin jlowin deleted the dask-executor branch February 13, 2017 13:05
@bolkedebruin
Copy link
Copy Markdown
Contributor

bolkedebruin commented Feb 13, 2017

@jlowin Correct, celery does not have coverage. This is one of the reasons why I would like to really start enforcing unit tests (as opposed to integration tests) and integration tests.

Coverage drops because Dask is being added but not tests (I am not sure what you meant with your last statement)

alekstorm pushed a commit to alekstorm/incubator-airflow that referenced this pull request Jun 1, 2017
Adds a DaskExecutor for running Airflow tasks
in Dask clusters.

Closes apache#2067 from jlowin/dask-executor
@tooptoop4
Copy link
Copy Markdown
Contributor

@jlowin can distribute version above v2 work? Setup.py limits it to <2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants