diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 5e5e2843c2c..2010d49f414 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -925,7 +925,7 @@ def __init__( self.total_occupancy = 0 self.host_info = defaultdict(dict) self.resources = defaultdict(dict) - self.aliases = dict() + self.aliases = defaultdict(list) self._task_state_collections = [self.unrunnable] @@ -1363,7 +1363,22 @@ def add_worker( ws = self.workers.get(address) if ws is not None: - raise ValueError("Worker already exists %s" % address) + msg = { + "status": "error", + "message": "worker already exists %s" % address, + "time": time(), + } + yield comm.write(msg) + return + + if name in self.aliases and address in self.aliases[name]: + msg = { + "status": "error", + "message": "name taken, %s" % name, + "time": time(), + } + yield comm.write(msg) + return self.workers[address] = ws = WorkerState( address=address, @@ -1375,15 +1390,6 @@ def add_worker( services=services, ) - if name in self.aliases: - msg = { - "status": "error", - "message": "name taken, %s" % name, - "time": time(), - } - yield comm.write(msg) - return - if "addresses" not in self.host_info[host]: self.host_info[host].update({"addresses": set(), "cores": 0}) @@ -1391,7 +1397,7 @@ def add_worker( self.host_info[host]["cores"] += ncores self.total_ncores += ncores - self.aliases[name] = address + self.aliases[name].append(address) response = self.heartbeat_worker( address=address, @@ -1856,7 +1862,12 @@ def remove_worker(self, comm=None, address=None, safe=False, close=True): self.rpc.remove(address) del self.stream_comms[address] - del self.aliases[ws.name] + try: + self.aliases[ws.name].remove(address) + except ValueError: + pass + if not self.aliases[ws.name]: + del self.aliases[ws.name] self.idle.discard(ws) self.saturated.discard(ws) del self.workers[address] @@ -4392,7 +4403,7 @@ def coerce_address(self, addr, resolve=True): """ # XXX how many address-parsing routines do we have? if addr in self.aliases: - addr = self.aliases[addr] + addr = self.aliases[addr][-1] if isinstance(addr, tuple): addr = unparse_host_port(*addr) if not isinstance(addr, six.string_types): @@ -4410,7 +4421,8 @@ def coerce_hostname(self, host): Coerce the hostname of a worker. """ if host in self.aliases: - return self.workers[self.aliases[host]].host + # get last as it's most recent + return self.workers[self.aliases[host][-1]].host else: return host diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 02f15e1e1a2..e9ee30dd2c7 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -536,11 +536,7 @@ def test_worker_name(): s.start(0) w = yield Worker(s.ip, s.port, name="alice") assert s.workers[w.address].name == "alice" - assert s.aliases["alice"] == w.address - - with pytest.raises(ValueError): - w2 = yield Worker(s.ip, s.port, name="alice") - yield w2._close() + assert w.address in s.aliases["alice"] yield s.close() yield w._close() diff --git a/distributed/utils.py b/distributed/utils.py index 5259e567358..629b60e4802 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -743,7 +743,6 @@ def silence_logging(level, root="distributed"): return old -@toolz.memoize def ensure_ip(hostname): """ Ensure that address is an IP address diff --git a/distributed/worker.py b/distributed/worker.py index 9f940f02f93..985581b2718 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -24,7 +24,7 @@ from tornado.gen import Return from tornado import gen from tornado.ioloop import IOLoop -from tornado.locks import Event +from tornado.locks import Event, Lock from . import profile, comm from .batched import BatchedSend @@ -502,6 +502,7 @@ def __init__( self.scheduler_delay = 0 self.stream_comms = dict() self.heartbeat_active = False + self.heartbeat_lock = Lock() self._ipython_kernel = None if self.local_dir not in sys.path: @@ -711,7 +712,7 @@ def _register_with_scheduler(self): except gen.TimeoutError: logger.info("Timed out when connecting to scheduler") if response["status"] != "OK": - raise ValueError("Unexpected response from register: %r" % (response,)) + logger.warning("Unexpected response from register: %r" % (response,)) else: # Retrieve eventual init functions and run them for function_bytes in response["worker-setups"]: @@ -734,30 +735,31 @@ def _register_with_scheduler(self): @gen.coroutine def heartbeat(self): - if not self.heartbeat_active: - self.heartbeat_active = True - logger.debug("Heartbeat: %s" % self.address) - try: - start = time() - response = yield self.scheduler.heartbeat_worker( - address=self.contact_address, now=time(), metrics=self.get_metrics() - ) - end = time() - middle = (start + end) / 2 - - if response["status"] == "missing": - yield self._register_with_scheduler() - return - self.scheduler_delay = response["time"] - middle - self.periodic_callbacks["heartbeat"].callback_time = ( - response["heartbeat-interval"] * 1000 - ) - except CommClosedError: - logger.warning("Heartbeat to scheduler failed") - finally: - self.heartbeat_active = False - else: - logger.debug("Heartbeat skipped: channel busy") + with (yield self.heartbeat_lock.acquire()): + if not self.heartbeat_active: + self.heartbeat_active = True + logger.debug("Heartbeat: %s" % self.address) + try: + start = time() + response = yield self.scheduler.heartbeat_worker( + address=self.contact_address, now=time(), metrics=self.get_metrics() + ) + end = time() + middle = (start + end) / 2 + + if response["status"] == "missing": + yield self._register_with_scheduler() + return + self.scheduler_delay = response["time"] - middle + self.periodic_callbacks["heartbeat"].callback_time = ( + response["heartbeat-interval"] * 1000 + ) + except CommClosedError: + logger.warning("Heartbeat to scheduler failed") + finally: + self.heartbeat_active = False + else: + logger.debug("Heartbeat skipped: channel busy") @gen.coroutine def handle_scheduler(self, comm):