diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 4f3c8520063..dd5cca1e43d 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -13,6 +13,7 @@ import random import six import warnings +import socket import psutil import sortedcontainers @@ -1272,13 +1273,36 @@ def add_worker(self, comm=None, address=None, keys=(), ncores=None, metrics=None, pid=0, services=None, local_directory=None): """ Add a new worker to the cluster """ with log_errors(): - address = self.coerce_address(address, resolve_address) + try: + address = self.coerce_address(address, resolve_address) + except socket.gaierror as e: + # handle exceptions if hostname isn't found in dnslookup, + # perhaps because dns propgation is slow + msg = {'status': 'error', + 'message': 'address not found, %s' % address, + 'time': time()} + yield comm.write(msg) + return address = normalize_address(address) host = get_address_host(address) ws = self.workers.get(address) if ws is not None: - raise ValueError("Worker already exists %s" % address) + self.remove_worker(address=address) + msg = {'status': 'error', + 'message': "worker already exists %s" % address, + 'time': time()} + yield comm.write(msg) + return + + # DD + + if name in self.aliases: + msg = {'status': 'error', + 'message': 'name taken, %s' % name, + 'time': time()} + yield comm.write(msg) + return self.workers[address] = ws = WorkerState( address=address, @@ -1290,13 +1314,6 @@ def add_worker(self, comm=None, address=None, keys=(), ncores=None, services=services ) - if name in self.aliases: - msg = {'status': 'error', - 'message': 'name taken, %s' % name, - 'time': time()} - yield comm.write(msg) - return - if 'addresses' not in self.host_info[host]: self.host_info[host].update({'addresses': set(), 'cores': 0}) @@ -4118,8 +4135,6 @@ def coerce_address(self, addr, resolve=True): Handles strings, tuples, or aliases. """ # XXX how many address-parsing routines do we have? - if addr in self.aliases: - addr = self.aliases[addr] if isinstance(addr, tuple): addr = unparse_host_port(*addr) if not isinstance(addr, six.string_types): diff --git a/distributed/utils.py b/distributed/utils.py index 306106bb0a9..5c31055860e 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -683,7 +683,7 @@ def silence_logging(level, root='distributed'): return old -@toolz.memoize +#@toolz.memoize def ensure_ip(hostname): """ Ensure that address is an IP address diff --git a/distributed/worker.py b/distributed/worker.py index 645ef40c16e..979b647f96f 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -596,6 +596,11 @@ def _register_with_scheduler(self): continue future = gen.with_timeout(timedelta(seconds=diff), future) response = yield future + if response['status'] == 'error': + msg = response.get('message','') + if msg.startswith('name taken') or msg.startswith('address not found') or msg.startswith('worker already exists'): + yield gen.sleep(0.1) + continue _end = time() middle = (_start + _end) / 2 self.scheduler_delay = response['time'] - middle