diff --git a/distributed/cli/dask_scheduler.py b/distributed/cli/dask_scheduler.py index 57a7168a3a2..3668be684d0 100755 --- a/distributed/cli/dask_scheduler.py +++ b/distributed/cli/dask_scheduler.py @@ -188,29 +188,15 @@ def del_pid_file(): loop = IOLoop.current() logger.info("-" * 47) - services = {} - if _bokeh: - try: - from distributed.bokeh.scheduler import BokehScheduler - - services[("bokeh", dashboard_address)] = ( - BokehScheduler, - {"prefix": bokeh_prefix}, - ) - except ImportError as error: - if str(error).startswith("No module named"): - logger.info("Web dashboard not loaded. Unable to import bokeh") - else: - logger.info("Unable to import bokeh: %s" % str(error)) - scheduler = Scheduler( loop=loop, - services=services, scheduler_file=scheduler_file, security=sec, host=host, port=port, interface=interface, + dashboard_address=dashboard_address if _bokeh else None, + service_kwargs={"bokeh": {"prefix": bokeh_prefix}}, ) scheduler.start() if not preload: diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index 1448395d109..e383095b382 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -163,7 +163,7 @@ default=None, help="Seconds to wait for a scheduler before closing", ) -@click.option("--bokeh-prefix", type=str, default=None, help="Prefix for the bokeh app") +@click.option("--bokeh-prefix", type=str, default="", help="Prefix for the bokeh app") @click.option( "--preload", type=str, @@ -288,18 +288,6 @@ def del_pid_file(): services = {} - if bokeh: - try: - from distributed.bokeh.worker import BokehWorker - except ImportError: - pass - else: - if bokeh_prefix: - result = (BokehWorker, {"prefix": bokeh_prefix}) - else: - result = BokehWorker - services[("bokeh", dashboard_address)] = result - if resources: resources = resources.replace(",", " ").split() resources = dict(pair.split("=") for pair in resources) @@ -350,6 +338,8 @@ def del_pid_file(): interface=interface, host=host, port=port, + dashboard_address=dashboard_address if bokeh else None, + service_kwargs={"bokhe": {"prefix": bokeh_prefix}}, name=name if nprocs == 1 or not name else name + "-" + str(i), **kwargs ) diff --git a/distributed/deploy/local.py b/distributed/deploy/local.py index 3431210a645..fb8793d0840 100644 --- a/distributed/deploy/local.py +++ b/distributed/deploy/local.py @@ -57,7 +57,7 @@ class LocalCluster(Cluster): Address on which to listen for the Bokeh diagnostics server like 'localhost:8787' or '0.0.0.0:8787'. Defaults to ':8787'. Set to ``None`` to disable the dashboard. - Use port 0 for a random port. + Use ':0' for a random port. diagnostics_port: int Deprecated. See dashboard_address. asynchronous: bool (False by default) @@ -112,6 +112,7 @@ def __init__( scheduler_port=0, silence_logs=logging.WARN, dashboard_address=":8787", + worker_dashboard_address=None, diagnostics_port=None, services=None, worker_services=None, @@ -179,29 +180,23 @@ def __init__( worker_kwargs["memory_limit"] = parse_memory_limit("auto", 1, n_workers) worker_kwargs.update( - {"ncores": threads_per_worker, "services": worker_services} + { + "ncores": threads_per_worker, + "services": worker_services, + "dashboard_address": worker_dashboard_address, + } ) self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous) self.loop = self._loop_runner.loop - if dashboard_address is not False and dashboard_address is not None: - try: - from distributed.bokeh.scheduler import BokehScheduler - from distributed.bokeh.worker import BokehWorker - except ImportError: - logger.debug("To start diagnostics web server please install Bokeh") - else: - services[("bokeh", dashboard_address)] = ( - BokehScheduler, - (service_kwargs or {}).get("bokeh", {}), - ) - worker_services[("bokeh", 0)] = BokehWorker - self.scheduler = Scheduler( loop=self.loop, services=services, + service_kwargs=service_kwargs, security=security, + interface=interface, + dashboard_address=dashboard_address, blocked_handlers=blocked_handlers, ) self.scheduler_port = scheduler_port diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 8ba4cedf468..8500150204e 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -818,6 +818,7 @@ def __init__( delete_interval="500ms", synchronize_worker_interval="60s", services=None, + service_kwargs=None, allowed_failures=ALLOWED_FAILURES, extensions=None, validate=False, @@ -829,6 +830,7 @@ def __init__( host=None, port=8786, protocol=None, + dashboard_address=None, **kwargs ): self._setup_logging() @@ -862,6 +864,17 @@ def __init__( self.connection_args = self.security.get_connection_args("scheduler") self.listen_args = self.security.get_listen_args("scheduler") + if dashboard_address is not None: + try: + from distributed.bokeh.scheduler import BokehScheduler + except ImportError: + logger.debug("To start diagnostics web server please install Bokeh") + else: + self.service_specs[("bokeh", dashboard_address)] = ( + BokehScheduler, + (service_kwargs or {}).get("bokeh", {}), + ) + # Communication state self.loop = loop or IOLoop.current() self.client_comms = dict() diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 805b0e06ed0..ec229dd3187 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -822,7 +822,7 @@ def test_file_descriptors(c, s): yield [n.close() for n in nannies] assert not s.rpc.open - assert not c.rpc.active, list(c.rpc._created) + assert not any(occ for addr, occ in c.rpc.occupied.items() if occ != s.address) assert not s.stream_comms start = time() diff --git a/distributed/worker.py b/distributed/worker.py index a1846d85539..1b103fe144a 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -290,6 +290,7 @@ def __init__( local_dir="dask-worker-space", services=None, service_ports=None, + service_kwargs=None, name=None, reconnect=True, memory_limit="auto", @@ -309,6 +310,7 @@ def __init__( host=None, port=None, protocol=None, + dashboard_address=None, low_level_profiler=dask.config.get("distributed.worker.profile.low-level"), **kwargs ): @@ -535,6 +537,18 @@ def __init__( self.services = {} self.service_ports = service_ports or {} self.service_specs = services or {} + + if dashboard_address is not None: + try: + from distributed.bokeh.worker import BokehWorker + except ImportError: + logger.debug("To start diagnostics web server please install Bokeh") + else: + self.service_specs[("bokeh", dashboard_address)] = ( + BokehWorker, + (service_kwargs or {}).get("bokeh", {}), + ) + self.metrics = dict(metrics) if metrics else {} self.low_level_profiler = low_level_profiler