From 7709ad7c76f4193205956f79cc186d402e2356dd Mon Sep 17 00:00:00 2001 From: darindf Date: Tue, 2 Apr 2019 11:40:40 -0700 Subject: [PATCH 1/2] Alternate implementation to support workers defined by hostnames that may resolve to different ip addresses during the worker lifespan. --- distributed/scheduler.py | 28 ++++++++++++++++++---------- distributed/utils.py | 2 +- distributed/worker.py | 5 +++++ 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 4f3c8520063..7516ced48c3 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -13,6 +13,7 @@ import random import six import warnings +import socket import psutil import sortedcontainers @@ -1272,7 +1273,16 @@ def add_worker(self, comm=None, address=None, keys=(), ncores=None, metrics=None, pid=0, services=None, local_directory=None): """ Add a new worker to the cluster """ with log_errors(): - address = self.coerce_address(address, resolve_address) + try: + address = self.coerce_address(address, resolve_address) + except socket.gaierror as e: + # handle exceptions if hostname isn't found in dnslookup, + # perhaps because dns propgation is slow + msg = {'status': 'error', + 'message': 'address not found, %s' % address, + 'time': time()} + yield comm.write(msg) + return address = normalize_address(address) host = get_address_host(address) @@ -1280,6 +1290,13 @@ def add_worker(self, comm=None, address=None, keys=(), ncores=None, if ws is not None: raise ValueError("Worker already exists %s" % address) + if name in self.aliases: + msg = {'status': 'error', + 'message': 'name taken, %s' % name, + 'time': time()} + yield comm.write(msg) + return + self.workers[address] = ws = WorkerState( address=address, pid=pid, @@ -1290,13 +1307,6 @@ def add_worker(self, comm=None, address=None, keys=(), ncores=None, services=services ) - if name in self.aliases: - msg = {'status': 'error', - 'message': 'name taken, %s' % name, - 'time': time()} - yield comm.write(msg) - return - if 'addresses' not in self.host_info[host]: self.host_info[host].update({'addresses': set(), 'cores': 0}) @@ -4118,8 +4128,6 @@ def coerce_address(self, addr, resolve=True): Handles strings, tuples, or aliases. """ # XXX how many address-parsing routines do we have? - if addr in self.aliases: - addr = self.aliases[addr] if isinstance(addr, tuple): addr = unparse_host_port(*addr) if not isinstance(addr, six.string_types): diff --git a/distributed/utils.py b/distributed/utils.py index 306106bb0a9..5c31055860e 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -683,7 +683,7 @@ def silence_logging(level, root='distributed'): return old -@toolz.memoize +#@toolz.memoize def ensure_ip(hostname): """ Ensure that address is an IP address diff --git a/distributed/worker.py b/distributed/worker.py index 645ef40c16e..29779b707de 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -596,6 +596,11 @@ def _register_with_scheduler(self): continue future = gen.with_timeout(timedelta(seconds=diff), future) response = yield future + if response['status'] == 'error': + msg = response.get('message','') + if msg.startswith('name taken') or msg.startswith('address not found'): + yield gen.sleep(0.1) + continue _end = time() middle = (_start + _end) / 2 self.scheduler_delay = response['time'] - middle From 465631ebaf8a7f2671011668fdf6a0ab682984ce Mon Sep 17 00:00:00 2001 From: darindf Date: Wed, 3 Apr 2019 13:18:20 -0700 Subject: [PATCH 2/2] Don't throw error if worker is already added to the scheduler. --- distributed/scheduler.py | 9 ++++++++- distributed/worker.py | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 7516ced48c3..dd5cca1e43d 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1288,7 +1288,14 @@ def add_worker(self, comm=None, address=None, keys=(), ncores=None, ws = self.workers.get(address) if ws is not None: - raise ValueError("Worker already exists %s" % address) + self.remove_worker(address=address) + msg = {'status': 'error', + 'message': "worker already exists %s" % address, + 'time': time()} + yield comm.write(msg) + return + + # DD if name in self.aliases: msg = {'status': 'error', diff --git a/distributed/worker.py b/distributed/worker.py index 29779b707de..979b647f96f 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -598,7 +598,7 @@ def _register_with_scheduler(self): response = yield future if response['status'] == 'error': msg = response.get('message','') - if msg.startswith('name taken') or msg.startswith('address not found'): + if msg.startswith('name taken') or msg.startswith('address not found') or msg.startswith('worker already exists'): yield gen.sleep(0.1) continue _end = time()