Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue support for DaskExecutor using Dask Worker Resources #16829

Merged
merged 16 commits into from
Sep 1, 2021

Conversation

aa1371
Copy link
Contributor

@aa1371 aa1371 commented Jul 6, 2021

This change allows specifying queues while using the DaskExecutor to route tasks to specific subsets of workers. Tests and docs updated.

This can introduce breaking changes, since with this change if the dask workers are not started with complementary resources to match the specified queues, it will now result in an AirflowException, whereas before it would have just ignored the queue argument. I think this is an acceptable breaking change since it reduces ambiguity in how the DAGs are executed.

closes: #16739

@boring-cyborg boring-cyborg bot added area:Scheduler Scheduler or dag parsing Issues kind:documentation labels Jul 6, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Jul 6, 2021

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@aa1371
Copy link
Contributor Author

aa1371 commented Jul 12, 2021

cc @potiuk

@potiuk
Copy link
Member

potiuk commented Jul 19, 2021

Unfortunately I have very little knowledge of Dask Executor. Anyone who can help with it . (and @aa1371 - can you please rebase to latest main)?

@aa1371
Copy link
Contributor Author

aa1371 commented Jul 20, 2021

@potiuk - merged
@jlowin @jrbourbeau @fjetter - would someone be willing to take a look at the changes and vet them? Thanks.

Comment on lines 86 to 88
avail_resources = self.client.run_on_scheduler(
lambda dask_scheduler: dask_scheduler.resources
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This technically works and it is there to stay but we usually use the run_on_scheduler method only for debugging purposes.
There is also the possibility to disable this method entirely on the scheduler to prohibit execution of malicious code since this would otherwise allow the execution of generic code.

I see two possibilities:

  1. Contribute a get_ressources method to distributed itself. I think this is a fair feature request
  2. Use a scheduler plugin (see https://distributed.dask.org/en/latest/plugins.html#distributed.diagnostics.plugin.SchedulerPlugin)

the scheduler plugin and contribution would use the same code it's just about where the code lives. Should be as simple as

class GetResourcesPlugin(SchedulerPlugin):
    def __init__(self, scheduler):
        self.scheduler = scheduler
        self.scheduler.handlers["get-resources"] = self.get_resources

    def get_resources(self):
        return self.scheduler.resources

on client side it's then simply

client.scheduler.get_resources

It's a bit boilerplate but this ensures that it will always work, even in locked down clusters.

Copy link
Contributor Author

@aa1371 aa1371 Jul 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, didn't know about the scheduler plugins, so thanks for that. Not sure what I'm doing wrong, but doesn't quite seem to work though,

# file.py
class GetResourcesPlugin(SchedulerPlugin):
    def __init__(self, scheduler):
        self.scheduler = scheduler
        self.scheduler.handlers["get-resources"] = self.get_resources

    def get_resources(self):
        return self.scheduler.resources

@click.command()
def dask_setup(scheduler):
    plugin = GetResourcesPlugin(scheduler)
    scheduler.add_plugin(plugin)
    print(scheduler.handlers)  # I see the desired handler added here
    # print(scheduler.get_resources())    # get error since this method does exist, so tried without it too
# start scheduler
$ dask-scheduler --preload file.py
c = Client('<scheduler address>')
c.scheduler.get_resources()  # doesn't seem to work

Any ideas what I'm doing wrong here?

In any case, I think suggestion (1) client.get_worker_resources() makes the most sense though, since knowing what worker resources you have available is relevant to how you would submit tasks from the client, I think it would be a good idea to be able to access this directly. If you agree, I'll be happy to set up a PR/tests for this.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scheduler.get_resources()

The scheduler will not have the method but the plugin has. By registering a handler you are registering an RPC with the scheduler.
The client.scheduler is, in fact, not the scheduler but a PooledRPCCall which means that any attributes on that object are translated to an RPC to the scheduler. Therefore client.scheduler.handler_name(handler_argument) will execute the method registered under handler handler_name with argument handler_argumnet. In this specific case it should read

client.scheduler.get_ressources() # There are no arguments here 

In fact, you might need to wrap this with the sync method since technically the RPC call will be async. Try

client.sync(client.scheduler.get_resources())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool thanks! That helped me get it working with a couple extra tweaks.

The get_resources method of the plugin class needed to a argument (looks like a comm.tcp.TCP objet being passed in to it). Also needed to call client.sync like so: client.sync(client.scheduler.get_resources), without the invocation of get_resources.

That said, how do you feel about client.get_worker_resources for the reason I listed above? I think that would be the better approach, since I think that's something users should easily be able to lookup.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, how do you feel about client.get_worker_resources for the reason I listed above? I think that would be the better approach, since I think that's something users should easily be able to lookup.

As I said, I think this is a sane addition and the code is identical. Since you figured out how this works now, would you be interested in contributing this to distributed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, will open a PR soon. Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fjetter - PR (with tests) added
dask/distributed#5105

@aa1371
Copy link
Contributor Author

aa1371 commented Jul 22, 2021

@fjetter - updated to use @mrocklin 's suggestion of client.scheduler_info() (suggested here: dask/distributed#5105)

@aa1371
Copy link
Contributor Author

aa1371 commented Jul 28, 2021

@potiuk - any further validation you want to consider on this PR? Seems like the only comment @fjetter had on the PR was the use of client.run_on_scheduler, which has since been refactored to use a more consistent api - client.scheduler_info

@potiuk
Copy link
Member

potiuk commented Aug 1, 2021

I run it now, let's see where it gets.

@potiuk
Copy link
Member

potiuk commented Aug 2, 2021

Static checks are failing,

@aa1371
Copy link
Contributor Author

aa1371 commented Aug 10, 2021

@potiuk - fixed static check errors

@potiuk
Copy link
Member

potiuk commented Aug 10, 2021

I am afraid it should be rebased to latest main

@aa1371
Copy link
Contributor Author

aa1371 commented Aug 12, 2021

updated

@aa1371
Copy link
Contributor Author

aa1371 commented Aug 17, 2021

any ideas why one of the "Build Images" checks was cancelled? don't see anything obvious in the details/logs. And is this consequential to moving forward with the review process/merging the PR? (I see similar cancelled CI tasks in recently merged PRs)

@potiuk
Copy link
Member

potiuk commented Aug 21, 2021

I am afraid you have errors again (for Dask tests) i heartily recommend to install Breeze and run the tests there if those tests are working in your local environment. This way you can easily reproduce the problems.

@aa1371
Copy link
Contributor Author

aa1371 commented Aug 21, 2021

Understood, and yea I see the issue. Looks like the default queue_name in the conf is "default" as opposed to undefined (potentially an artifact of celery). How do you think we should handle this with respect to DaskExecutor? Seems like as of right now a default needs to be defined or else BaseOperator.__init__ will fail on getting the default queue.

Several ways I can see handling this:

  1. Make queue='default' a special case in DaskExecutor, which just acts as if no queue name was passed in
  2. Require users of the DaskExecutor to specify an empty default_queue value in the config (if they don't want to use queues)
  3. Update the base config/BaseOperator, to provide an undefined queue to start with

I'm thinking option 1 is probably the most straightforward way to go. Thoughts @potiuk?

@potiuk
Copy link
Member

potiuk commented Aug 21, 2021

Absolutely 1) 'default' should be treated as "special" in DaskExecutor.

Just to explain a bit of context and set expectations:

The whole DaskExecutor integration is (and should be) an afterthought for Airflow - to be perfectly honest. DaskExecutor is very rarely used and I think there are no committers who are in any capacity familiar with DaskExecutor because it is - to be perfectly blunt - a total niche for Airflow. Last time we reached out to Dask Community to "keep it" and the only reason Dask Execuor remained in Airflow 2 was because people from Dask community contributed some fixes to keep the tests from failing.

And this is the only way to keep it "in" to be honest. We need people from the Dask user's community to make sure that the DaskExecutor is good to go.

So DaskExecutor should definitely follow any of the "Airflow" conventions and adapt to Airflow, rather than Airflow adapting to it. Celery Executor is by far the most popular one in Airflow https://airflow.apache.org/blog/airflow-survey-2020/ and Dask Executor is not even mentioned there. So yeah. adapting Dask Executor to accept default as special value sounds about right.

@aa1371
Copy link
Contributor Author

aa1371 commented Aug 21, 2021

Gotcha, thanks for the context. That all makes sense. Pushed a fix just now.

@potiuk
Copy link
Member

potiuk commented Aug 21, 2021

I heartilly recommend installing pre-commits. (https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#pre-commit-hooks) - saves a LOT of time on iterations.

@aa1371
Copy link
Contributor Author

aa1371 commented Aug 21, 2021

Yes indeed, thanks. Passing pre-commit checks and dask_executor tests now. The last CI run here failed on a sql deadlock error, guessing those are transient/can be ignored or rerun?

@aa1371
Copy link
Contributor Author

aa1371 commented Aug 30, 2021

@potiuk would we be able to run the rest of the CI again?

@aa1371
Copy link
Contributor Author

aa1371 commented Aug 30, 2021

@potiuk - thanks for rerunning. Looks like the only failures in the tests are some AWS SQS errors, unrelated to this PR. Is this a known issue atm?

@potiuk
Copy link
Member

potiuk commented Aug 30, 2021

I thoink it will need another rebase :(

@aa1371
Copy link
Contributor Author

aa1371 commented Aug 31, 2021

updated, can we run again @potiuk? thanks. also, is there a better process for iterating in these kinds of situations, since I don't want to have to keep bothering you just to rerun due to transient issues

@potiuk
Copy link
Member

potiuk commented Aug 31, 2021

updated, can we run again @potiuk? thanks. also, is there a better process for iterating in these kinds of situations, since I don't want to have to keep bothering you just to rerun due to transient issues

You are a relatively new user for GitHub and this is your first contribution here. It will go away once your first change is merged. This is something we cannot disable it (but we did configure it to only run for the first time contributors who are "new" GitHub users only). I do not know though what "new" means for GitHub https://github.blog/changelog/2021-04-22-github-actions-maintainers-must-approve-first-time-contributor-workflow-runs/

@potiuk
Copy link
Member

potiuk commented Aug 31, 2021

One way for you to deal with it is to pick a "simple" fix and merge it faster :D. There are some issues here https://github.com/apache/airflow/contribute

@aa1371
Copy link
Contributor Author

aa1371 commented Sep 1, 2021

Gotcha, I'll probably tackle this one #17762 if @lformant doesn't want to, since I already have a working solution and it's a simple change.
Looks like all the tests ran successfully though for now. Anything else we need to consider/review @potiuk?

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Don't use dask, and cannot check it, but it looks totally reasonable.

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Sep 1, 2021
@github-actions
Copy link

github-actions bot commented Sep 1, 2021

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@potiuk potiuk merged commit 226a2b8 into apache:main Sep 1, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Sep 1, 2021

Awesome work, congrats on your first merged pull request!

@potiuk
Copy link
Member

potiuk commented Sep 1, 2021

Gotcha, I'll probably tackle this one #17762 if @lformant doesn't want to, since I already have a working solution and it's a simple change.

I guess it's not needed now since your first PR is merged :D. But you are absolutely welcome to do so!

@aa1371
Copy link
Contributor Author

aa1371 commented Sep 1, 2021

Thanks for all the time and help @potiuk

jedcunningham added a commit to astronomer/airflow that referenced this pull request Oct 4, 2021
Add a note about the behavior change introduced in apache#16829.
kaxil pushed a commit that referenced this pull request Oct 5, 2021
Add a note about the behavior change introduced in #16829.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler Scheduler or dag parsing Issues full tests needed We need to run full set of tests for this PR to merge kind:documentation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Queue support for DaskExecutor using Dask Worker Resources
3 participants