Skip to content

Commit

Permalink
Merge pull request #1434 from PrefectHQ/new-exec
Browse files Browse the repository at this point in the history
New executor
  • Loading branch information
cicdw committed Aug 30, 2019
2 parents 337bf2d + b125de7 commit 8f8432c
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 138 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/

- Added Local, Kubernetes, and Nomad agents - [#1341](https://github.com/PrefectHQ/prefect/pull/1341)
- Add the ability for Tasks to sequentially loop - [#1356](https://github.com/PrefectHQ/prefect/pull/1356)
- - Add `AzureResultHandler` for handling results to / from Azure Blob storage containers - [#1421](https://github.com/PrefectHQ/prefect/pull/1421)

### Enhancements

Expand All @@ -27,6 +26,8 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/
- Support persistent `scheduled_start_time` for scheduled flow runs when run locally with `flow.run()` - [#1418](https://github.com/PrefectHQ/prefect/pull/1418), [#1429](https://github.com/PrefectHQ/prefect/pull/1429)
- Add `task_args` to `Task.map` - [#1390](https://github.com/PrefectHQ/prefect/issues/1390)
- Add auth flows for `USER`-scoped Cloud API tokens - [#1423](https://github.com/PrefectHQ/prefect/pull/1423)
- Add `AzureResultHandler` for handling results to / from Azure Blob storage containers - [#1421](https://github.com/PrefectHQ/prefect/pull/1421)
- Add new configurable `LocalDaskExecutor` - [#1336](https://github.com/PrefectHQ/prefect/issues/1336)
- Add CLI commands for working with Prefect Cloud auth - [#1431](https://github.com/PrefectHQ/prefect/pull/1431)

### Task Library
Expand All @@ -40,11 +41,12 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/

### Deprecations

- None
- Rename `SynchronousExecutor` as `LocalDaskExecutor` - [#1434](https://github.com/PrefectHQ/prefect/pull/1434)

### Breaking Changes

- Rename `CloudEnvironment` to `DaskKubernetesEnvironment` - [#1250](https://github.com/PrefectHQ/prefect/issues/1250)
- Remove unused `queue` method from all executors - [#1434](https://github.com/PrefectHQ/prefect/pull/1434)

### Contributors

Expand Down
1 change: 1 addition & 0 deletions docs/outline.toml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ module = "prefect.engine.executors"
classes = [
"Executor",
"DaskExecutor",
"LocalDaskExecutor",
"LocalExecutor",
"SynchronousExecutor"]

Expand Down
2 changes: 1 addition & 1 deletion src/prefect/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ checkpointing = false
[engine.executor]

# the default executor, specified using a full path
default_class = "prefect.engine.executors.SynchronousExecutor"
default_class = "prefect.engine.executors.LocalExecutor"

[engine.executor.dask]
# the default scheduler address for the DaskExecutor. Set to "local" to configure
Expand Down
6 changes: 2 additions & 4 deletions src/prefect/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ def get_default_executor_class() -> type:
except ValueError:
warn(
"Could not import {}; using "
"prefect.engine.executors.SynchronousExecutor instead.".format(
config_value
)
"prefect.engine.executors.LocalExecutor instead.".format(config_value)
)
return prefect.engine.executors.SynchronousExecutor
return prefect.engine.executors.LocalExecutor
else:
return config_value

Expand Down
8 changes: 4 additions & 4 deletions src/prefect/engine/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
- `LocalExecutor`: the no frills, straightforward executor - great for simple
debugging; tasks are executed immediately upon being called by `executor.submit()`.
Note that the `LocalExecutor` is not capable of parallelism.
- `SynchronousExecutor`: an executor that runs on `dask` primitives with the
synchronous dask scheduler; currently the default executor
Note that the `LocalExecutor` is not capable of parallelism. Currently the default executor.
- `LocalDaskExecutor`: an executor that runs on `dask` primitives with a
configurable dask scheduler.
- `DaskExecutor`: the most feature-rich of the executors, this executor runs
on `dask.distributed` and has support for multiprocessing, multithreading, and distributed execution.
Expand All @@ -27,6 +27,6 @@
"""
import prefect
from prefect.engine.executors.base import Executor
from prefect.engine.executors.dask import DaskExecutor
from prefect.engine.executors.dask import DaskExecutor, LocalDaskExecutor
from prefect.engine.executors.local import LocalExecutor
from prefect.engine.executors.sync import SynchronousExecutor
12 changes: 0 additions & 12 deletions src/prefect/engine/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,3 @@ def wait(self, futures: Any) -> Any:
- Any: an iterable of resolved futures
"""
raise NotImplementedError()

def queue(self, maxsize: int = 0) -> Any:
"""
Creates an executor-compatible Queue object that can share state across tasks.
Args:
- maxsize (int): maxsize of the queue; defaults to 0 (infinite)
Returns:
- Queue: an executor compatible queue that can be shared among tasks
"""
raise NotImplementedError()
87 changes: 72 additions & 15 deletions src/prefect/engine/executors/dask.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import dask
import datetime
import logging
import queue
Expand All @@ -6,7 +7,7 @@
from contextlib import contextmanager
from typing import Any, Callable, Iterable, Iterator, List

from distributed import Client, Future, Queue, fire_and_forget, worker_client
from distributed import Client, Future, fire_and_forget, worker_client

from prefect import config, context
from prefect.engine.executors.base import Executor
Expand Down Expand Up @@ -102,20 +103,6 @@ def _prep_dask_kwargs(self) -> dict:

return dask_kwargs

def queue(self, maxsize: int = 0, client: Client = None) -> Queue:
"""
Creates an executor-compatible Queue object that can share state
across tasks.
Args:
- maxsize (int, optional): `maxsize` for the Queue; defaults to 0
(interpreted as no size limitation)
- client (dask.distributed.Client, optional): which client to
associate the Queue with; defaults to `self.client`
"""
q = Queue(maxsize=maxsize, client=client or self.client)
return q

def __getstate__(self) -> dict:
state = self.__dict__.copy()
if "client" in state:
Expand Down Expand Up @@ -201,3 +188,73 @@ def wait(self, futures: Any) -> Any:
return client.gather(futures)
else:
raise ValueError("This executor has not been started.")


class LocalDaskExecutor(Executor):
"""
An executor that runs all functions locally using `dask` and a configurable dask scheduler. Note that
this executor is known to occasionally run tasks twice when using multi-level mapping.
Args:
- scheduler (str): The local dask scheduler to use; common options are "synchronous", "threads" and "processes". Defaults to "synchronous".
- **kwargs (Any): Additional keyword arguments to pass to dask config
"""

def __init__(self, scheduler: str = "synchronous", **kwargs: Any):
self.scheduler = scheduler
self.kwargs = kwargs
super().__init__()

@contextmanager
def start(self) -> Iterator:
"""
Context manager for initializing execution.
Configures `dask` and yields the `dask.config` contextmanager.
"""
with dask.config.set(scheduler=self.scheduler, **self.kwargs) as cfg:
yield cfg

def submit(self, fn: Callable, *args: Any, **kwargs: Any) -> dask.delayed:
"""
Submit a function to the executor for execution. Returns a `dask.delayed` object.
Args:
- fn (Callable): function that is being submitted for execution
- *args (Any): arguments to be passed to `fn`
- **kwargs (Any): keyword arguments to be passed to `fn`
Returns:
- dask.delayed: a `dask.delayed` object that represents the computation of `fn(*args, **kwargs)`
"""
return dask.delayed(fn)(*args, **kwargs)

def map(self, fn: Callable, *args: Any) -> List[dask.delayed]:
"""
Submit a function to be mapped over its iterable arguments.
Args:
- fn (Callable): function that is being submitted for execution
- *args (Any): arguments that the function will be mapped over
Returns:
- List[dask.delayed]: the result of computating the function over the arguments
"""
results = []
for args_i in zip(*args):
results.append(self.submit(fn, *args_i))
return results

def wait(self, futures: Any) -> Any:
"""
Resolves a `dask.delayed` object to its values. Blocks until the computation is complete.
Args:
- futures (Any): iterable of `dask.delayed` objects to compute
Returns:
- Any: an iterable of resolved futures
"""
with dask.config.set(scheduler=self.scheduler, **self.kwargs) as cfg:
return dask.compute(futures)[0]
71 changes: 11 additions & 60 deletions src/prefect/engine/executors/sync.py
Original file line number Diff line number Diff line change
@@ -1,75 +1,26 @@
import datetime
import warnings
from contextlib import contextmanager
from queue import Queue
from typing import Any, Callable, Iterable, Iterator, List

import dask
import dask.bag

from prefect.engine.executors.base import Executor
from prefect.engine.executors.dask import LocalDaskExecutor


class SynchronousExecutor(Executor):
class SynchronousExecutor(LocalDaskExecutor):
"""
An executor that runs all functions synchronously using `dask`. Note that
this executor is known to occasionally run tasks twice when using multi-level mapping.
"""

@contextmanager
def start(self) -> Iterator:
"""
Context manager for initializing execution.
Configures `dask` and yields the `dask.config` contextmanager.
"""
with dask.config.set(scheduler="synchronous") as cfg:
yield cfg

def queue(self, maxsize: int = 0) -> Queue:
q = Queue(maxsize=maxsize) # type: Queue
return q

def submit(self, fn: Callable, *args: Any, **kwargs: Any) -> dask.delayed:
"""
Submit a function to the executor for execution. Returns a `dask.delayed` object.
Args:
- fn (Callable): function that is being submitted for execution
- *args (Any): arguments to be passed to `fn`
- **kwargs (Any): keyword arguments to be passed to `fn`
Returns:
- dask.delayed: a `dask.delayed` object that represents the computation of `fn(*args, **kwargs)`
"""
return dask.delayed(fn)(*args, **kwargs)
def map(self, fn: Callable, *args: Any) -> List[dask.delayed]:
"""
Submit a function to be mapped over its iterable arguments.
Args:
- fn (Callable): function that is being submitted for execution
- *args (Any): arguments that the function will be mapped over
Returns:
- List[dask.delayed]: the result of computating the function over the arguments
"""
results = []
for args_i in zip(*args):
results.append(self.submit(fn, *args_i))
return results

def wait(self, futures: Any) -> Any:
"""
Resolves a `dask.delayed` object to its values. Blocks until the computation is complete.
Args:
- futures (Any): iterable of `dask.delayed` objects to compute
NOTE: this class is deprecated and maintained only for backwards-compatibility.
"""

Returns:
- Any: an iterable of resolved futures
"""
with dask.config.set(scheduler="synchronous"):
return dask.compute(futures)[0]
def __init__(self) -> None:
warnings.warn(
"The SynchronousExecutor is deprecated and will be removed from "
"Prefect. Use a LocalDaskExecutor with a 'synchronous' scheduler instead.",
UserWarning,
)
super().__init__(scheduler="synchronous")
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from distributed import Client

import prefect
from prefect.engine.executors import DaskExecutor, LocalExecutor, SynchronousExecutor
from prefect.engine.executors import DaskExecutor, LocalExecutor, LocalDaskExecutor
from prefect.utilities import debug, configuration


Expand Down Expand Up @@ -44,7 +44,7 @@ def local():
@pytest.fixture()
def sync():
"Synchronous dask (not dask.distributed) executor"
yield SynchronousExecutor()
yield LocalDaskExecutor()


@pytest.fixture(scope="session")
Expand Down
Loading

0 comments on commit 8f8432c

Please sign in to comment.