Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SpecificationCluster #2675

Merged
merged 58 commits into from
May 22, 2019
Merged
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
0666d46
Add SpecificationCluster
mrocklin May 8, 2019
f4c72e2
Pass in loop appropriately
mrocklin May 9, 2019
83088c8
Merge branch 'master' of github.com:dask/distributed into spec-cluster
mrocklin May 14, 2019
2ac4e6e
Merge branch 'master' into spec-cluster
mrocklin May 14, 2019
ab7b54f
Implement SpecCluster.scale
mrocklin May 14, 2019
ce17774
LocalCluster(SpecCluster)
mrocklin May 15, 2019
5edaf9d
support scale_up/down
mrocklin May 15, 2019
f7a52f7
handle Cluster.close returning a coroutine
mrocklin May 15, 2019
a4ea250
Merge branch 'master' of github.com:dask/distributed into spec-cluster
mrocklin May 15, 2019
07fcd63
await workers to trigger errors
mrocklin May 15, 2019
9c39fc6
Cleanup test_adaptive
mrocklin May 15, 2019
3fda334
Support silent logging
mrocklin May 15, 2019
757a123
cleanup remaining tests
mrocklin May 16, 2019
db4484a
Merge branch 'master' of github.com:dask/distributed into spec-cluster
mrocklin May 16, 2019
d3f8c59
cleanup name= handling
mrocklin May 16, 2019
f17665c
Cleanup the handling of nannies
mrocklin May 16, 2019
e573150
fix up nanny
mrocklin May 16, 2019
2ea0303
cleeanup test
mrocklin May 16, 2019
99a883f
cleanup test_io_loop
mrocklin May 16, 2019
13ae0fa
add debug info in Nanny._instances check
mrocklin May 16, 2019
2307da5
Avoid very frequent calls to _correct_state
mrocklin May 16, 2019
3385a22
cleanup test_nanny.py::test_wait_for_scheduler
mrocklin May 16, 2019
6c8b01c
Add port=0 to scheduler tests
mrocklin May 16, 2019
ce74d04
cleanup test worker test_io_loop (again)
mrocklin May 16, 2019
cc91c43
cleanup test_worker_waits_for_scheduler
mrocklin May 16, 2019
af98aee
py35 compat
mrocklin May 16, 2019
cd4ba3d
Use port=0 for scheduler tests
mrocklin May 16, 2019
dd29cac
Use connection pool in Nanny
mrocklin May 16, 2019
3db0b2d
Fix test_bad_tasks_fail
mrocklin May 16, 2019
fb72f9a
don't check nannies at start
mrocklin May 16, 2019
6a489b5
parse_timedelta passes through Non
mrocklin May 16, 2019
dd0bd0e
improve error reporting
mrocklin May 16, 2019
28167c2
Cleanup test_open_close_many_workers
mrocklin May 16, 2019
4691ea9
mild cleanup
mrocklin May 16, 2019
1594402
Merge branch 'master' of github.com:dask/distributed into spec-cluster
mrocklin May 16, 2019
eb4b496
add pytest-asyncio to tests
mrocklin May 17, 2019
55807fb
Add __del__ and cleanup test_client_timeout
mrocklin May 17, 2019
c5fffcd
Clean up spec closing and del
mrocklin May 17, 2019
36cbb18
port=0 on test_io_loop_periodic_callbacks
mrocklin May 17, 2019
474352f
Don't unregister the nanny
mrocklin May 17, 2019
060eea0
close client in test_file_descriptors
mrocklin May 17, 2019
2eae717
skip test_min_max in adaptive
mrocklin May 17, 2019
14811b6
Merge branch 'spec-cluster' of github.com:mrocklin/distributed into s…
mrocklin May 17, 2019
a8a5d11
Improve handling of cancellations in as_completed and gather
mrocklin May 17, 2019
304ade1
cleanup cluster startup and cleanup
mrocklin May 17, 2019
632112f
relax ws gc check
mrocklin May 17, 2019
46d465a
don't use processes in test_worker_params
mrocklin May 17, 2019
18ab7c5
xfail intermittent failure
mrocklin May 18, 2019
ea5a122
expand SpecCluster docstring
mrocklin May 20, 2019
87b3372
cleanup test_file_descriptors check on comms
mrocklin May 21, 2019
e1e0abf
allow missing bandwidth value
mrocklin May 21, 2019
8360352
Cleanup test_client_timeout
mrocklin May 21, 2019
c13b848
Add list of workers to assertion message
mrocklin May 21, 2019
34052e7
cleanup a bit after feedback
mrocklin May 21, 2019
949e310
Don't inject loop into workers
mrocklin May 21, 2019
5e94069
Add test for broken worker
mrocklin May 21, 2019
649a758
Test that bare LocalCluster starts up by default
mrocklin May 22, 2019
409f194
permit closed comms in test_file_descriptors
mrocklin May 22, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion continuous_integration/setup_conda_environment.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ call activate %CONDA_ENV%
%PIP_INSTALL% git+https://github.com/joblib/joblib.git --upgrade
%PIP_INSTALL% git+https://github.com/dask/zict --upgrade

%PIP_INSTALL% pytest-repeat pytest-timeout pytest-faulthandler sortedcollections
%PIP_INSTALL% pytest-repeat pytest-timeout pytest-faulthandler sortedcollections pytest-asyncio

@rem Display final environment (for reproducing)
%CONDA% list
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/travis/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ conda install -q \
conda install -c defaults -c conda-forge libunwind
conda install --no-deps -c defaults -c numba -c conda-forge stacktrace

pip install -q pytest-repeat pytest-faulthandler
pip install -q pytest-repeat pytest-faulthandler pytest-asyncio

pip install -q git+https://github.com/dask/dask.git --upgrade --no-deps
pip install -q git+https://github.com/joblib/joblib.git --upgrade --no-deps
Expand Down
2 changes: 1 addition & 1 deletion distributed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dask.config import config
from .actor import Actor, ActorFuture
from .core import connect, rpc
from .deploy import LocalCluster, Adaptive
from .deploy import LocalCluster, Adaptive, SpecCluster
from .diagnostics import progress
from .client import (
Client,
Expand Down
2 changes: 1 addition & 1 deletion distributed/cli/dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@
@click.option(
"--name",
type=str,
default="",
default=None,
help="A unique name for this worker like 'worker-1'. "
"If used with --nprocs then the process number "
"will be appended like name-0, name-1, name-2, ...",
Expand Down
24 changes: 18 additions & 6 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import atexit
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures._base import DoneAndNotDoneFutures, CancelledError
from concurrent.futures import ThreadPoolExecutor, CancelledError
from concurrent.futures._base import DoneAndNotDoneFutures
from contextlib import contextmanager
import copy
from datetime import timedelta
Expand Down Expand Up @@ -44,6 +44,8 @@
from tornado.ioloop import IOLoop
from tornado.queues import Queue

from asyncio import iscoroutine

from .batched import BatchedSend
from .utils_comm import (
WrappedKey,
Expand Down Expand Up @@ -1301,7 +1303,13 @@ def close(self, timeout=no_default):

if self._start_arg is None:
with ignoring(AttributeError):
self.cluster.close()
f = self.cluster.close()
if iscoroutine(f):

async def _():
await f

self.sync(_)

sync(self.loop, self._close, fast=True)

Expand Down Expand Up @@ -1636,10 +1644,11 @@ def wait(k):
st = self.futures[key]
exception = st.exception
traceback = st.traceback
except (AttributeError, KeyError):
six.reraise(CancelledError, CancelledError(key), None)
except (KeyError, AttributeError):
exc = CancelledError(key)
else:
six.reraise(type(exception), exception, traceback)
raise exc
if errors == "skip":
bad_keys.add(key)
bad_data[key] = None
Expand Down Expand Up @@ -4059,7 +4068,10 @@ def _track_future(self, future):
except CancelledError:
pass
if self.with_results:
result = yield future._result(raiseit=False)
try:
result = yield future._result(raiseit=False)
except CancelledError as exc:
result = exc
with self.lock:
self.futures[future] -= 1
if not self.futures[future]:
Expand Down
1 change: 1 addition & 0 deletions distributed/comm/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ def start(self):
break
else:
raise exc
self.get_host_port() # trigger assignment to self.bound_address

def stop(self):
tcp_server, self.tcp_server = self.tcp_server, None
Expand Down
8 changes: 5 additions & 3 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,14 +489,16 @@ def handle_stream(self, comm, extra=None, every_cycle=[]):

@gen.coroutine
def close(self):
self.listener.stop()
for pc in self.periodic_callbacks.values():
pc.stop()
if self.listener:
self.listener.stop()
for i in range(20): # let comms close naturally for a second
if not self._comms:
break
else:
yield gen.sleep(0.05)
for comm in self._comms:
comm.close()
yield [comm.close() for comm in self._comms]
for cb in self._ongoing_coroutines:
cb.cancel()
for i in range(10):
Expand Down
1 change: 1 addition & 0 deletions distributed/deploy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from .cluster import Cluster
from .local import LocalCluster
from .spec import SpecCluster
from .adaptive import Adaptive

with ignoring(ImportError):
Expand Down
4 changes: 2 additions & 2 deletions distributed/deploy/adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def _retire_workers(self, workers=None):

logger.info("Retiring workers %s", workers)
f = self.cluster.scale_down(workers)
if gen.is_future(f):
if hasattr(f, "__await__"):
yield f

raise gen.Return(workers)
Expand Down Expand Up @@ -354,7 +354,7 @@ def _adapt(self):
if status == "up":
f = self.cluster.scale_up(**recommendations)
self.log.append((time(), "up", recommendations))
if gen.is_future(f):
if hasattr(f, "__await__"):
yield f

elif status == "down":
Expand Down
32 changes: 31 additions & 1 deletion distributed/deploy/cluster.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
from datetime import timedelta
import logging
import os
from weakref import ref

import dask
from tornado import gen

from .adaptive import Adaptive

from ..utils import format_bytes, PeriodicCallback, log_errors, ignoring
from ..compatibility import get_thread_identity
from ..utils import (
format_bytes,
PeriodicCallback,
log_errors,
ignoring,
sync,
thread_state,
)


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -215,3 +226,22 @@ def update():

def _ipython_display_(self, **kwargs):
return self._widget()._ipython_display_(**kwargs)

@property
def asynchronous(self):
return (
self._asynchronous
or getattr(thread_state, "asynchronous", False)
or hasattr(self.loop, "_thread_identity")
and self.loop._thread_identity == get_thread_identity()
)

def sync(self, func, *args, **kwargs):
if kwargs.pop("asynchronous", None) or self.asynchronous:
callback_timeout = kwargs.pop("callback_timeout", None)
future = func(*args, **kwargs)
if callback_timeout is not None:
future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
return future
else:
return sync(self.loop, func, *args, **kwargs)
Loading