Skip to content

Commit

Permalink
Merge pull request #151 from PrefectHQ/dask-upgrade
Browse files Browse the repository at this point in the history
Dask upgrade
  • Loading branch information
cicdw committed Aug 22, 2018
2 parents 74ba05e + dc20538 commit 090178f
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 12 deletions.
4 changes: 4 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ jobs:
name: Install Prefect
command: pip install -e ".[dev]"

- run:
name: Install zsh for tests
command: apt-get update && apt-get install -y zsh

- run:
name: Auto-generate markdown
command: |
Expand Down
12 changes: 10 additions & 2 deletions src/prefect/engine/executors/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,24 @@
class DaskExecutor(Executor):
"""
An executor that runs all functions synchronously using `dask`.
Args:
- scheduler (string, optional): which dask scheduler to use; defaults to
`"synchronous"`. Other available options are `"threads"` for multithreading and `"processes"` for multiprocessing.
"""

def __init__(self, scheduler="synchronous"):
self.scheduler = scheduler
super().__init__()

@contextmanager
def start(self) -> Iterable[None]:
"""
Context manager for initializing execution.
Configures `dask` to run synchronously and yields the `dask.config` contextmanager.
Configures `dask` to run using the provided scheduler and yields the `dask.config` contextmanager.
"""
with dask.config.set(scheduler="synchronous") as cfg:
with dask.config.set(scheduler=self.scheduler) as cfg:
yield cfg

def submit(self, fn: Callable, *args: Any, **kwargs: Any) -> dask.delayed:
Expand Down
17 changes: 17 additions & 0 deletions tests/engine/executors/test_executors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
import time

import prefect
from prefect.engine.executors import DaskExecutor, Executor, LocalExecutor
Expand Down Expand Up @@ -82,3 +83,19 @@ def test_submit_with_context_requires_context_kwarg(self):
with pytest.raises(TypeError) as exc:
DaskExecutor().submit_with_context(lambda: 1)
assert "missing 1 required keyword-only argument: 'context'" in str(exc.value)

@pytest.mark.parametrize("scheduler", ["threads", "processes"])
def test_executor_implements_parallelism(self, scheduler):
executor = DaskExecutor(scheduler=scheduler)

@prefect.task
def timed():
time.sleep(0.25)
return time.time()

with prefect.Flow() as f:
a, b = timed(), timed()

res = f.run(executor=executor, return_tasks=f.tasks)
times = [s.result for t, s in res.result.items()]
assert abs(times[0] - times[1]) < 0.25
29 changes: 19 additions & 10 deletions tests/engine/test_flow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,17 @@
from prefect.utilities.tests import raise_on_exception


@pytest.fixture(params=[DaskExecutor, LocalExecutor])
@pytest.fixture(
params=[
DaskExecutor(scheduler="synchronous"),
DaskExecutor(scheduler="threads"),
DaskExecutor(scheduler="processes"),
LocalExecutor(),
],
ids=["dask-sync", "dask-threads", "dask-process", "local"],
)
def executor(request):
return request.param()
return request.param


class SuccessTask(Task):
Expand Down Expand Up @@ -78,13 +86,8 @@ def run(self):


class ReturnTask(Task):
called = False

def run(self, x):
if self.called is False:
self.called = True
raise ValueError("Must call twice.")
return x
return 1 / (x - 1)


def test_flow_runner_runs_basic_flow_with_1_task():
Expand Down Expand Up @@ -414,12 +417,14 @@ def test_retries_use_cached_inputs(self, executor):

first_state = FlowRunner(flow=f).run(executor=executor, return_tasks=[res])
assert isinstance(first_state, Pending)
b_state = first_state.result[res]
b_state.cached_inputs = dict(x=2) # artificially alter state
with raise_on_exception(): # without caching we'd expect a KeyError
second_state = FlowRunner(flow=f).run(
executor=executor,
return_tasks=[res],
start_tasks=[res],
task_states={res: first_state.result[res]},
task_states={res: b_state},
)
assert isinstance(second_state, Success)
assert second_state.result[res].result == 1
Expand Down Expand Up @@ -449,9 +454,13 @@ def test_retries_caches_parameters_as_well(self, executor):
executor=executor, parameters=dict(x=1), return_tasks=[res]
)
assert isinstance(first_state, Pending)

res_state = first_state.result[res]
res_state.cached_inputs = dict(x=2) # artificially alter state

second_state = FlowRunner(flow=f).run(
executor=executor,
parameters=dict(x=2),
parameters=dict(x=1),
return_tasks=[res],
start_tasks=[res],
task_states={res: first_state.result[res]},
Expand Down
4 changes: 4 additions & 0 deletions tests/tasks/test_shell.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import pytest
import subprocess

from prefect import Flow
from prefect.engine import signals
Expand All @@ -14,6 +15,9 @@ def test_shell_initializes_and_runs_basic_cmd():
assert out.result[task].result == b"hello world"


@pytest.mark.skipif(
subprocess.check_output(["which", "zsh"]) == b"", reason="zsh not installed."
)
def test_shell_runs_other_shells():
with Flow() as f:
task = ShellTask(shell="zsh")(command="echo -n $ZSH_NAME")
Expand Down

0 comments on commit 090178f

Please sign in to comment.