-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New executor #1434
New executor #1434
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor clarifications!
This enables but does not test a threaded executor, is that correct?
@@ -71,7 +71,7 @@ checkpointing = false | |||
[engine.executor] | |||
|
|||
# the default executor, specified using a full path | |||
default_class = "prefect.engine.executors.SynchronousExecutor" | |||
default_class = "prefect.engine.executors.LocalExecutor" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm willing to revert this, but with the known possibility for double execution in deep mapped pipelines, this felt safer. I actually thought we had already done this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally fine, not asking for reversion was just curious why
src/prefect/engine/executors/dask.py
Outdated
with dask.config.set(scheduler=self.scheduler, **self.kwargs) as cfg: | ||
yield cfg | ||
|
||
def queue(self, maxsize: int = 0) -> Queue: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not important for this PR, but are we using these queues anywhere? Can we kill them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can kill them in this PR, we are not using them anywhere!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although maybe a user is? I'll add this to breaking changes...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
Co-Authored-By: Jeremiah Lowin <153965+jlowin@users.noreply.github.com>
And yes, @jlowin this doesn't test the threaded executor. I could update all of our executor tests to parametrize over this if you'd like. My reasoning for it being "OK" (but maybe not 100% ideal) is that it's only changing dask's internal scheduling mechanism, and the fully distributed Dask executor tests for threading / process problems that could arise, so I didn't feel too uncomfortable with it. |
Fix issue where gather can fail when a task returns a pandas object
Thanks for contributing to Prefect!
Please describe your work and make sure your PR:
CHANGELOG.md
(if appropriate)docs/outline.toml
for API reference docs (if appropriate)Note that your PR will not be reviewed unless all three boxes are checked.
What does this PR change?
This PR implements a new
LocalDaskExecutor
for running Flows with various dask schedulers. In addition, now that this executor supersedesSynchronousExecutor
(and is easier to type), we will be deprecating theSynchronousExecutor
.Why is this PR important?
Closes #1336 and allows for more fine-tuning of how your workflows should be run.