Skip to content
Merged
31 changes: 14 additions & 17 deletions distributed/cli/dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import gc
import os
import re
import shutil
import sys
import tempfile
Expand All @@ -16,12 +17,7 @@

from distributed import Scheduler
from distributed.security import Security
from distributed.utils import get_ip_interface
from distributed.cli.utils import (
check_python_3,
install_signal_handlers,
uri_from_host_port,
)
from distributed.cli.utils import check_python_3, install_signal_handlers
from distributed.preloading import preload_modules, validate_preload_argv
from distributed.proctitle import (
enable_proctitle_on_children,
Expand Down Expand Up @@ -151,6 +147,9 @@ def main(
)
dashboard_address = bokeh_port

if port is None and (not host or not re.search(r":\d", host)):
port = 8786

sec = Security(
tls_ca_file=tls_ca_file, tls_scheduler_cert=tls_cert, tls_scheduler_key=tls_key
)
Expand Down Expand Up @@ -186,14 +185,6 @@ def del_pid_file():
limit = max(soft, hard // 2)
resource.setrlimit(resource.RLIMIT_NOFILE, (limit, hard))

if interface:
if host:
raise ValueError("Can not specify both interface and host")
else:
host = get_ip_interface(interface)

addr = uri_from_host_port(host, port, 8786)

loop = IOLoop.current()
logger.info("-" * 47)

Expand All @@ -213,9 +204,15 @@ def del_pid_file():
logger.info("Unable to import bokeh: %s" % str(error))

scheduler = Scheduler(
loop=loop, services=services, scheduler_file=scheduler_file, security=sec
loop=loop,
services=services,
scheduler_file=scheduler_file,
security=sec,
host=host,
port=port,
interface=interface,
)
scheduler.start(addr)
scheduler.start()
if not preload:
preload = dask.config.get("distributed.scheduler.preload")
if not preload_argv:
Expand All @@ -237,7 +234,7 @@ def del_pid_file():
if local_directory_created:
shutil.rmtree(local_directory)

logger.info("End scheduler at %r", addr)
logger.info("End scheduler at %r", scheduler.address)


def go():
Expand Down
25 changes: 6 additions & 19 deletions distributed/cli/dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,10 @@
import click
from distributed import Nanny, Worker
from distributed.config import config
from distributed.utils import get_ip_interface, parse_timedelta
from distributed.utils import parse_timedelta
from distributed.worker import _ncores
from distributed.security import Security
from distributed.cli.utils import (
check_python_3,
uri_from_host_port,
install_signal_handlers,
)
from distributed.cli.utils import check_python_3, install_signal_handlers
from distributed.comm import get_address_host_port
from distributed.preloading import validate_preload_argv
from distributed.proctitle import (
Expand Down Expand Up @@ -328,18 +324,6 @@ def del_pid_file():
"dask-worker SCHEDULER_ADDRESS:8786"
)

if interface:
if host:
raise ValueError("Can not specify both interface and host")
else:
host = get_ip_interface(interface)

if host or port:
addr = uri_from_host_port(host, port, 0)
else:
# Choose appropriate address for scheduler
addr = None

if death_timeout is not None:
death_timeout = parse_timedelta(death_timeout, "s")

Expand All @@ -359,6 +343,9 @@ def del_pid_file():
preload_argv=preload_argv,
security=sec,
contact_address=contact_address,
interface=interface,
host=host,
port=port,
name=name if nprocs == 1 or not name else name + "-" + str(i),
**kwargs
)
Expand All @@ -377,7 +364,7 @@ def on_signal(signum):

@gen.coroutine
def run():
yield [n._start(addr) for n in nannies]
yield nannies
while all(n.status != "closed" for n in nannies):
yield gen.sleep(0.2)

Expand Down
54 changes: 0 additions & 54 deletions distributed/cli/tests/test_cli_utils.py

This file was deleted.

3 changes: 1 addition & 2 deletions distributed/cli/tests/test_dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import requests
import sys
from time import sleep
from toolz import first

from distributed import Client
from distributed.metrics import time
Expand Down Expand Up @@ -52,7 +51,7 @@ def test_memory_limit(loop):
while not c.ncores():
sleep(0.1)
info = c.scheduler_info()
d = first(info["workers"].values())
[d] = info["workers"].values()
assert isinstance(d["memory_limit"], int)
assert d["memory_limit"] == 2e9

Expand Down
40 changes: 0 additions & 40 deletions distributed/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,6 @@
from tornado import gen
from tornado.ioloop import IOLoop

from distributed.comm import (
parse_address,
unparse_address,
parse_host_port,
unparse_host_port,
)


py3_err_msg = """
Warning: Your terminal does not set locales.
Expand Down Expand Up @@ -75,36 +68,3 @@ def cleanup_and_stop():

for sig in [signal.SIGINT, signal.SIGTERM]:
old_handlers[sig] = signal.signal(sig, handle_signal)


def uri_from_host_port(host_arg, port_arg, default_port):
"""
Process the *host* and *port* CLI options.
Return a URI.
"""
# Much of distributed depends on a well-known IP being assigned to
# each entity (Worker, Scheduler, etc.), so avoid "universal" addresses
# like '' which would listen on all registered IPs and interfaces.
scheme, loc = parse_address(host_arg or "")

host, port = parse_host_port(
loc, port_arg if port_arg is not None else default_port
)

if port is None and port_arg is None:
port_arg = default_port

if port and port_arg and port != port_arg:
raise ValueError(
"port number given twice in options: "
"host %r and port %r" % (host_arg, port_arg)
)
if port is None and port_arg is not None:
port = port_arg
# Note `port = 0` means "choose a random port"
if port is None:
port = default_port
loc = unparse_host_port(host, port)
addr = unparse_address(scheme, loc)

return addr
69 changes: 69 additions & 0 deletions distributed/comm/addressing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import dask

from . import registry
from ..utils import get_ip_interface


DEFAULT_SCHEME = dask.config.get("distributed.comm.default-scheme")
Expand Down Expand Up @@ -172,3 +173,71 @@ def resolve_address(addr):
scheme, loc = parse_address(addr)
backend = registry.get_backend(scheme)
return unparse_address(scheme, backend.resolve_address(loc))


def uri_from_host_port(host_arg, port_arg, default_port):
"""
Process the *host* and *port* CLI options.
Return a URI.
"""
# Much of distributed depends on a well-known IP being assigned to
# each entity (Worker, Scheduler, etc.), so avoid "universal" addresses
# like '' which would listen on all registered IPs and interfaces.
scheme, loc = parse_address(host_arg or "")

host, port = parse_host_port(
loc, port_arg if port_arg is not None else default_port
)

if port is None and port_arg is None:
port_arg = default_port

if port and port_arg and port != port_arg:
raise ValueError(
"port number given twice in options: "
"host %r and port %r" % (host_arg, port_arg)
)
if port is None and port_arg is not None:
port = port_arg
# Note `port = 0` means "choose a random port"
if port is None:
port = default_port
loc = unparse_host_port(host, port)
addr = unparse_address(scheme, loc)

return addr


def address_from_user_args(
host=None, port=None, interface=None, protocol=None, peer=None, security=None
):
""" Get an address to listen on from common user provided arguments """
if security and security.require_encryption and not protocol:
protocol = "tls"

if protocol and protocol.rstrip("://") == "inplace":
if host or port or interface:
raise ValueError(
"Can not specify inproc protocol and host or port or interface"
)
else:
return "inproc://"

if interface:
if host:
raise ValueError("Can not specify both interface and host", interface, host)
else:
host = get_ip_interface(interface)

if protocol and host and "://" not in host:
host = protocol.rstrip("://") + "://" + host

if host or port:
addr = uri_from_host_port(host, port, 0)
else:
addr = ""

if protocol and "://" not in addr:
addr = protocol.rstrip("://") + "://" + addr

return addr
15 changes: 15 additions & 0 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from tornado.locks import Event

from .comm import get_address_host, get_local_address_for, unparse_host_port
from .comm.addressing import address_from_user_args
from .core import rpc, RPCClosed, CommClosedError, coerce_to_address
from .metrics import time
from .node import ServerNode
Expand Down Expand Up @@ -69,6 +70,10 @@ def __init__(
listen_address=None,
worker_class=None,
env=None,
interface=None,
host=None,
port=None,
protocol=None,
**worker_kwargs
):

Expand Down Expand Up @@ -135,6 +140,14 @@ def __init__(
pc = PeriodicCallback(self.memory_monitor, 100, io_loop=self.loop)
self.periodic_callbacks["memory"] = pc

self._start_address = address_from_user_args(
host=host,
port=port,
interface=interface,
protocol=protocol,
security=security,
)

self._listen_address = listen_address
self.status = "init"

Expand Down Expand Up @@ -175,6 +188,7 @@ def worker_dir(self):
@gen.coroutine
def _start(self, addr_or_port=0):
""" Start nanny, start local process, start watching """
addr_or_port = addr_or_port or self._start_address

# XXX Factor this out
if not addr_or_port:
Expand Down Expand Up @@ -419,6 +433,7 @@ def start(self):

self.process = AsyncProcess(
target=self._run,
name="Dask Worker process (from Nanny)",
kwargs=dict(
worker_args=self.worker_args,
worker_kwargs=self.worker_kwargs,
Expand Down
Loading