From e15317ce4c3adc1f4cdfe71ec245cbb653de1e02 Mon Sep 17 00:00:00 2001 From: darindf Date: Mon, 1 Apr 2019 15:10:38 -0700 Subject: [PATCH 1/4] Add support for workers that are referenced by hostnames, whose ip addresses may change after network connection is established, such as being suspended and resumed. --- distributed/scheduler.py | 42 +++++++++++++++++++++++----------------- distributed/utils.py | 1 - 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 5e5e2843c2c..1c5432d3103 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -925,7 +925,7 @@ def __init__( self.total_occupancy = 0 self.host_info = defaultdict(dict) self.resources = defaultdict(dict) - self.aliases = dict() + self.aliases = defaultdict(set) self._task_state_collections = [self.unrunnable] @@ -1363,20 +1363,16 @@ def add_worker( ws = self.workers.get(address) if ws is not None: - raise ValueError("Worker already exists %s" % address) - - self.workers[address] = ws = WorkerState( - address=address, - pid=pid, - ncores=ncores, - memory_limit=memory_limit, - name=name, - local_directory=local_directory, - services=services, - ) + msg = { + "status": "error", + "message": "worker already exists %s" % address, + "time": time(), + } + yield comm.write(msg) + return - if name in self.aliases: - msg = { + if name in self.aliases and address in self.aliases[name]: + msg = { "status": "error", "message": "name taken, %s" % name, "time": time(), @@ -1384,6 +1380,16 @@ def add_worker( yield comm.write(msg) return + self.workers[address] = ws = WorkerState( + address=address, + pid=pid, + ncores=ncores, + memory_limit=memory_limit, + name=name, + local_directory=local_directory, + services=services + ) + if "addresses" not in self.host_info[host]: self.host_info[host].update({"addresses": set(), "cores": 0}) @@ -1391,7 +1397,7 @@ def add_worker( self.host_info[host]["cores"] += ncores self.total_ncores += ncores - self.aliases[name] = address + self.aliases[name].append(address) response = self.heartbeat_worker( address=address, @@ -1856,7 +1862,9 @@ def remove_worker(self, comm=None, address=None, safe=False, close=True): self.rpc.remove(address) del self.stream_comms[address] - del self.aliases[ws.name] + self.aliases[ws.name].remove(address) + if not self.aliases[ws.name]: + del self.aliases[ws.name] self.idle.discard(ws) self.saturated.discard(ws) del self.workers[address] @@ -4391,8 +4399,6 @@ def coerce_address(self, addr, resolve=True): Handles strings, tuples, or aliases. """ # XXX how many address-parsing routines do we have? - if addr in self.aliases: - addr = self.aliases[addr] if isinstance(addr, tuple): addr = unparse_host_port(*addr) if not isinstance(addr, six.string_types): diff --git a/distributed/utils.py b/distributed/utils.py index 5259e567358..629b60e4802 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -743,7 +743,6 @@ def silence_logging(level, root="distributed"): return old -@toolz.memoize def ensure_ip(hostname): """ Ensure that address is an IP address From 8a32ff93a0554f1a232d1c330bf3c087cfb6d0d7 Mon Sep 17 00:00:00 2001 From: darindf Date: Mon, 22 Apr 2019 08:53:40 -0700 Subject: [PATCH 2/4] Resolve conflicts --- distributed/scheduler.py | 33 ++++++++++- distributed/worker.py | 124 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 153 insertions(+), 4 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 1c5432d3103..1474a2969f7 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -925,7 +925,7 @@ def __init__( self.total_occupancy = 0 self.host_info = defaultdict(dict) self.resources = defaultdict(dict) - self.aliases = defaultdict(set) + self.aliases = defaultdict(list) self._task_state_collections = [self.unrunnable] @@ -1363,20 +1363,32 @@ def add_worker( ws = self.workers.get(address) if ws is not None: +<<<<<<< HEAD msg = { "status": "error", "message": "worker already exists %s" % address, "time": time(), } +======= + msg = {'status': 'error', + 'message': "worker already exists %s" % address, + 'time': time()} +>>>>>>> Improve worker/scheduler communication such that scheduler state isn't changed until checks are made and scheduler responses to the worker are not exceptions, rather "error" messages that the worker can process. yield comm.write(msg) return if name in self.aliases and address in self.aliases[name]: +<<<<<<< HEAD msg = { "status": "error", "message": "name taken, %s" % name, "time": time(), } +======= + msg = {'status': 'error', + 'message': 'name taken, %s' % name, + 'time': time()} +>>>>>>> Improve worker/scheduler communication such that scheduler state isn't changed until checks are made and scheduler responses to the worker are not exceptions, rather "error" messages that the worker can process. yield comm.write(msg) return @@ -1390,8 +1402,11 @@ def add_worker( services=services ) +<<<<<<< HEAD if "addresses" not in self.host_info[host]: self.host_info[host].update({"addresses": set(), "cores": 0}) +======= +>>>>>>> Improve worker/scheduler communication such that scheduler state isn't changed until checks are made and scheduler responses to the worker are not exceptions, rather "error" messages that the worker can process. self.host_info[host]["addresses"].add(address) self.host_info[host]["cores"] += ncores @@ -1399,6 +1414,7 @@ def add_worker( self.total_ncores += ncores self.aliases[name].append(address) +<<<<<<< HEAD response = self.heartbeat_worker( address=address, resolve_address=resolve_address, @@ -1407,6 +1423,13 @@ def add_worker( host_info=host_info, metrics=metrics, ) +======= + response = self.heartbeat_worker(address=address, + resolve_address=resolve_address, + now=now, resources=resources, + host_info=host_info, + metrics=metrics) +>>>>>>> Improve worker/scheduler communication such that scheduler state isn't changed until checks are made and scheduler responses to the worker are not exceptions, rather "error" messages that the worker can process. # Do not need to adjust self.total_occupancy as self.occupancy[ws] cannot exist before this. self.check_idle_saturated(ws) @@ -1862,7 +1885,10 @@ def remove_worker(self, comm=None, address=None, safe=False, close=True): self.rpc.remove(address) del self.stream_comms[address] - self.aliases[ws.name].remove(address) + try: + self.aliases[ws.name].remove(address) + except ValueError: + pass if not self.aliases[ws.name]: del self.aliases[ws.name] self.idle.discard(ws) @@ -4416,7 +4442,8 @@ def coerce_hostname(self, host): Coerce the hostname of a worker. """ if host in self.aliases: - return self.workers[self.aliases[host]].host + # get last as it's most recent + return self.workers[self.aliases[host][-1]].host else: return host diff --git a/distributed/worker.py b/distributed/worker.py index 9f940f02f93..830d9ffe140 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -24,7 +24,7 @@ from tornado.gen import Return from tornado import gen from tornado.ioloop import IOLoop -from tornado.locks import Event +from tornado.locks import Event, Lock from . import profile, comm from .batched import BatchedSend @@ -502,6 +502,7 @@ def __init__( self.scheduler_delay = 0 self.stream_comms = dict() self.heartbeat_active = False + self.heartbeat_lock = Lock() self._ipython_kernel = None if self.local_dir not in sys.path: @@ -671,6 +672,7 @@ def _register_with_scheduler(self): raise gen.Return try: _start = time() +<<<<<<< HEAD comm = yield connect( self.scheduler.address, connection_args=self.connection_args ) @@ -694,6 +696,26 @@ def _register_with_scheduler(self): serializers=["msgpack"], ) future = comm.read(deserializers=["msgpack"]) +======= + comm = yield connect(self.scheduler.address, + connection_args=self.connection_args) + yield comm.write(dict(op='register-worker', + reply=False, + address=self.contact_address, + keys=list(self.data), + ncores=self.ncores, + name=self.name, + nbytes=self.nbytes, + now=time(), + resources=self.total_resources, + memory_limit=self.memory_limit, + local_directory=self.local_dir, + services=self.service_ports, + pid=os.getpid(), + metrics=self.get_metrics()), + serializers=['msgpack']) + future = comm.read(deserializers=['msgpack']) +>>>>>>> Improve worker/scheduler communication such that scheduler state isn't changed until checks are made and scheduler responses to the worker are not exceptions, rather "error" messages that the worker can process. if self.death_timeout: diff = self.death_timeout - (time() - start) if diff < 0: @@ -710,8 +732,14 @@ def _register_with_scheduler(self): yield gen.sleep(0.1) except gen.TimeoutError: logger.info("Timed out when connecting to scheduler") +<<<<<<< HEAD if response["status"] != "OK": raise ValueError("Unexpected response from register: %r" % (response,)) +======= + if response['status'] != 'OK': + logger.warning("Unexpected response from register: %r" % + (response,)) +>>>>>>> Improve worker/scheduler communication such that scheduler state isn't changed until checks are made and scheduler responses to the worker are not exceptions, rather "error" messages that the worker can process. else: # Retrieve eventual init functions and run them for function_bytes in response["worker-setups"]: @@ -727,6 +755,7 @@ def _register_with_scheduler(self): logger.info(" Registered to: %26s", self.scheduler.address) logger.info("-" * 49) +<<<<<<< HEAD self.batched_stream = BatchedSend(interval="2ms", loop=self.loop) self.batched_stream.start(comm) self.periodic_callbacks["heartbeat"].start() @@ -758,6 +787,99 @@ def heartbeat(self): self.heartbeat_active = False else: logger.debug("Heartbeat skipped: channel busy") +======= + while True: + if self.death_timeout and time() > start + self.death_timeout: + yield self._close(timeout=1) + return + if self.status in ('closed', 'closing'): + raise gen.Return + try: + _start = time() + comm = yield connect(self.scheduler.address, + connection_args=self.connection_args) + yield comm.write(dict(op='register-worker', + reply=False, + address=self.contact_address, + keys=list(self.data), + ncores=self.ncores, + name=self.name, + nbytes=self.nbytes, + now=time(), + resources=self.total_resources, + memory_limit=self.memory_limit, + local_directory=self.local_dir, + services=self.service_ports, + pid=os.getpid(), + metrics=self.get_metrics()), + serializers=['msgpack']) + future = comm.read(deserializers=['msgpack']) + if self.death_timeout: + diff = self.death_timeout - (time() - start) + if diff < 0: + continue + future = gen.with_timeout(timedelta(seconds=diff), future) + response = yield future + _end = time() + middle = (_start + _end) / 2 + self.scheduler_delay = response['time'] - middle + self.status = 'running' + break + except EnvironmentError: + logger.info('Waiting to connect to: %26s', self.scheduler.address) + yield gen.sleep(0.1) + except gen.TimeoutError: + logger.info("Timed out when connecting to scheduler") + if response['status'] != 'OK': + logger.warning("Unexpected response from register: %r" % + (response,)) + else: + # Retrieve eventual init functions and run them + for function_bytes in response['worker-setups']: + setup_function = pickle.loads(function_bytes) + if has_arg(setup_function, 'dask_worker'): + result = setup_function(dask_worker=self) + else: + result = setup_function() + logger.info('Init function %s ran: output=%s' % (setup_function, result)) + + logger.info(' Registered to: %26s', self.scheduler.address) + logger.info('-' * 49) + + self.batched_stream = BatchedSend(interval='2ms', loop=self.loop) + self.batched_stream.start(comm) + finally: + self.periodic_callbacks['heartbeat'].start() + self.loop.add_callback(self.handle_scheduler, comm) + + @gen.coroutine + def heartbeat(self): + with (yield self.heartbeat_lock.acquire()): + if not self.heartbeat_active: + self.heartbeat_active = True + logger.debug("Heartbeat: %s" % self.address) + try: + start = time() + response = yield self.scheduler.heartbeat_worker( + address=self.contact_address, + now=time(), + metrics=self.get_metrics() + ) + end = time() + middle = (start + end) / 2 + + if response['status'] == 'missing': + yield self._register_with_scheduler() + return + self.scheduler_delay = response['time'] - middle + self.periodic_callbacks['heartbeat'].callback_time = response['heartbeat-interval'] * 1000 + except CommClosedError: + logger.warning("Heartbeat to scheduler failed") + finally: + self.heartbeat_active = False + else: + logger.debug("Heartbeat skipped: channel busy") +>>>>>>> Improve worker/scheduler communication such that scheduler state isn't changed until checks are made and scheduler responses to the worker are not exceptions, rather "error" messages that the worker can process. @gen.coroutine def handle_scheduler(self, comm): From 63c89775432c06ce4d2c122662f7b941e38135e0 Mon Sep 17 00:00:00 2001 From: darindf Date: Fri, 12 Apr 2019 15:17:51 -0700 Subject: [PATCH 3/4] Cleanup --- distributed/worker.py | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 830d9ffe140..3b174b2d540 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -698,22 +698,22 @@ def _register_with_scheduler(self): future = comm.read(deserializers=["msgpack"]) ======= comm = yield connect(self.scheduler.address, - connection_args=self.connection_args) + connection_args=self.connection_args) yield comm.write(dict(op='register-worker', - reply=False, - address=self.contact_address, - keys=list(self.data), - ncores=self.ncores, - name=self.name, - nbytes=self.nbytes, - now=time(), - resources=self.total_resources, - memory_limit=self.memory_limit, - local_directory=self.local_dir, - services=self.service_ports, - pid=os.getpid(), - metrics=self.get_metrics()), - serializers=['msgpack']) + reply=False, + address=self.contact_address, + keys=list(self.data), + ncores=self.ncores, + name=self.name, + nbytes=self.nbytes, + now=time(), + resources=self.total_resources, + memory_limit=self.memory_limit, + local_directory=self.local_dir, + services=self.service_ports, + pid=os.getpid(), + metrics=self.get_metrics()), + serializers=['msgpack']) future = comm.read(deserializers=['msgpack']) >>>>>>> Improve worker/scheduler communication such that scheduler state isn't changed until checks are made and scheduler responses to the worker are not exceptions, rather "error" messages that the worker can process. if self.death_timeout: @@ -761,6 +761,7 @@ def _register_with_scheduler(self): self.periodic_callbacks["heartbeat"].start() self.loop.add_callback(self.handle_scheduler, comm) +<<<<<<< HEAD @gen.coroutine def heartbeat(self): if not self.heartbeat_active: @@ -851,6 +852,12 @@ def heartbeat(self): finally: self.periodic_callbacks['heartbeat'].start() self.loop.add_callback(self.handle_scheduler, comm) +======= + self.batched_stream = BatchedSend(interval='2ms', loop=self.loop) + self.batched_stream.start(comm) + self.periodic_callbacks['heartbeat'].start() + self.loop.add_callback(self.handle_scheduler, comm) +>>>>>>> Cleanup @gen.coroutine def heartbeat(self): From e1f0590c42220d15ced039f16849eea4ab7be798 Mon Sep 17 00:00:00 2001 From: darindf Date: Mon, 15 Apr 2019 15:26:56 -0700 Subject: [PATCH 4/4] Fix acceptance --- distributed/scheduler.py | 45 +++------ distributed/tests/test_scheduler.py | 6 +- distributed/worker.py | 141 ++-------------------------- 3 files changed, 20 insertions(+), 172 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 1474a2969f7..2010d49f414 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1363,50 +1363,35 @@ def add_worker( ws = self.workers.get(address) if ws is not None: -<<<<<<< HEAD - msg = { + msg = { "status": "error", "message": "worker already exists %s" % address, "time": time(), } -======= - msg = {'status': 'error', - 'message': "worker already exists %s" % address, - 'time': time()} ->>>>>>> Improve worker/scheduler communication such that scheduler state isn't changed until checks are made and scheduler responses to the worker are not exceptions, rather "error" messages that the worker can process. yield comm.write(msg) return if name in self.aliases and address in self.aliases[name]: -<<<<<<< HEAD - msg = { + msg = { "status": "error", "message": "name taken, %s" % name, "time": time(), } -======= - msg = {'status': 'error', - 'message': 'name taken, %s' % name, - 'time': time()} ->>>>>>> Improve worker/scheduler communication such that scheduler state isn't changed until checks are made and scheduler responses to the worker are not exceptions, rather "error" messages that the worker can process. yield comm.write(msg) return self.workers[address] = ws = WorkerState( - address=address, - pid=pid, - ncores=ncores, - memory_limit=memory_limit, - name=name, - local_directory=local_directory, - services=services + address=address, + pid=pid, + ncores=ncores, + memory_limit=memory_limit, + name=name, + local_directory=local_directory, + services=services, ) -<<<<<<< HEAD if "addresses" not in self.host_info[host]: self.host_info[host].update({"addresses": set(), "cores": 0}) -======= ->>>>>>> Improve worker/scheduler communication such that scheduler state isn't changed until checks are made and scheduler responses to the worker are not exceptions, rather "error" messages that the worker can process. self.host_info[host]["addresses"].add(address) self.host_info[host]["cores"] += ncores @@ -1414,7 +1399,6 @@ def add_worker( self.total_ncores += ncores self.aliases[name].append(address) -<<<<<<< HEAD response = self.heartbeat_worker( address=address, resolve_address=resolve_address, @@ -1423,13 +1407,6 @@ def add_worker( host_info=host_info, metrics=metrics, ) -======= - response = self.heartbeat_worker(address=address, - resolve_address=resolve_address, - now=now, resources=resources, - host_info=host_info, - metrics=metrics) ->>>>>>> Improve worker/scheduler communication such that scheduler state isn't changed until checks are made and scheduler responses to the worker are not exceptions, rather "error" messages that the worker can process. # Do not need to adjust self.total_occupancy as self.occupancy[ws] cannot exist before this. self.check_idle_saturated(ws) @@ -1885,7 +1862,7 @@ def remove_worker(self, comm=None, address=None, safe=False, close=True): self.rpc.remove(address) del self.stream_comms[address] - try: + try: self.aliases[ws.name].remove(address) except ValueError: pass @@ -4425,6 +4402,8 @@ def coerce_address(self, addr, resolve=True): Handles strings, tuples, or aliases. """ # XXX how many address-parsing routines do we have? + if addr in self.aliases: + addr = self.aliases[addr][-1] if isinstance(addr, tuple): addr = unparse_host_port(*addr) if not isinstance(addr, six.string_types): diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 02f15e1e1a2..e9ee30dd2c7 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -536,11 +536,7 @@ def test_worker_name(): s.start(0) w = yield Worker(s.ip, s.port, name="alice") assert s.workers[w.address].name == "alice" - assert s.aliases["alice"] == w.address - - with pytest.raises(ValueError): - w2 = yield Worker(s.ip, s.port, name="alice") - yield w2._close() + assert w.address in s.aliases["alice"] yield s.close() yield w._close() diff --git a/distributed/worker.py b/distributed/worker.py index 3b174b2d540..985581b2718 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -672,7 +672,6 @@ def _register_with_scheduler(self): raise gen.Return try: _start = time() -<<<<<<< HEAD comm = yield connect( self.scheduler.address, connection_args=self.connection_args ) @@ -696,26 +695,6 @@ def _register_with_scheduler(self): serializers=["msgpack"], ) future = comm.read(deserializers=["msgpack"]) -======= - comm = yield connect(self.scheduler.address, - connection_args=self.connection_args) - yield comm.write(dict(op='register-worker', - reply=False, - address=self.contact_address, - keys=list(self.data), - ncores=self.ncores, - name=self.name, - nbytes=self.nbytes, - now=time(), - resources=self.total_resources, - memory_limit=self.memory_limit, - local_directory=self.local_dir, - services=self.service_ports, - pid=os.getpid(), - metrics=self.get_metrics()), - serializers=['msgpack']) - future = comm.read(deserializers=['msgpack']) ->>>>>>> Improve worker/scheduler communication such that scheduler state isn't changed until checks are made and scheduler responses to the worker are not exceptions, rather "error" messages that the worker can process. if self.death_timeout: diff = self.death_timeout - (time() - start) if diff < 0: @@ -732,14 +711,8 @@ def _register_with_scheduler(self): yield gen.sleep(0.1) except gen.TimeoutError: logger.info("Timed out when connecting to scheduler") -<<<<<<< HEAD if response["status"] != "OK": - raise ValueError("Unexpected response from register: %r" % (response,)) -======= - if response['status'] != 'OK': - logger.warning("Unexpected response from register: %r" % - (response,)) ->>>>>>> Improve worker/scheduler communication such that scheduler state isn't changed until checks are made and scheduler responses to the worker are not exceptions, rather "error" messages that the worker can process. + logger.warning("Unexpected response from register: %r" % (response,)) else: # Retrieve eventual init functions and run them for function_bytes in response["worker-setups"]: @@ -755,110 +728,11 @@ def _register_with_scheduler(self): logger.info(" Registered to: %26s", self.scheduler.address) logger.info("-" * 49) -<<<<<<< HEAD self.batched_stream = BatchedSend(interval="2ms", loop=self.loop) self.batched_stream.start(comm) self.periodic_callbacks["heartbeat"].start() self.loop.add_callback(self.handle_scheduler, comm) -<<<<<<< HEAD - @gen.coroutine - def heartbeat(self): - if not self.heartbeat_active: - self.heartbeat_active = True - logger.debug("Heartbeat: %s" % self.address) - try: - start = time() - response = yield self.scheduler.heartbeat_worker( - address=self.contact_address, now=time(), metrics=self.get_metrics() - ) - end = time() - middle = (start + end) / 2 - - if response["status"] == "missing": - yield self._register_with_scheduler() - return - self.scheduler_delay = response["time"] - middle - self.periodic_callbacks["heartbeat"].callback_time = ( - response["heartbeat-interval"] * 1000 - ) - except CommClosedError: - logger.warning("Heartbeat to scheduler failed") - finally: - self.heartbeat_active = False - else: - logger.debug("Heartbeat skipped: channel busy") -======= - while True: - if self.death_timeout and time() > start + self.death_timeout: - yield self._close(timeout=1) - return - if self.status in ('closed', 'closing'): - raise gen.Return - try: - _start = time() - comm = yield connect(self.scheduler.address, - connection_args=self.connection_args) - yield comm.write(dict(op='register-worker', - reply=False, - address=self.contact_address, - keys=list(self.data), - ncores=self.ncores, - name=self.name, - nbytes=self.nbytes, - now=time(), - resources=self.total_resources, - memory_limit=self.memory_limit, - local_directory=self.local_dir, - services=self.service_ports, - pid=os.getpid(), - metrics=self.get_metrics()), - serializers=['msgpack']) - future = comm.read(deserializers=['msgpack']) - if self.death_timeout: - diff = self.death_timeout - (time() - start) - if diff < 0: - continue - future = gen.with_timeout(timedelta(seconds=diff), future) - response = yield future - _end = time() - middle = (_start + _end) / 2 - self.scheduler_delay = response['time'] - middle - self.status = 'running' - break - except EnvironmentError: - logger.info('Waiting to connect to: %26s', self.scheduler.address) - yield gen.sleep(0.1) - except gen.TimeoutError: - logger.info("Timed out when connecting to scheduler") - if response['status'] != 'OK': - logger.warning("Unexpected response from register: %r" % - (response,)) - else: - # Retrieve eventual init functions and run them - for function_bytes in response['worker-setups']: - setup_function = pickle.loads(function_bytes) - if has_arg(setup_function, 'dask_worker'): - result = setup_function(dask_worker=self) - else: - result = setup_function() - logger.info('Init function %s ran: output=%s' % (setup_function, result)) - - logger.info(' Registered to: %26s', self.scheduler.address) - logger.info('-' * 49) - - self.batched_stream = BatchedSend(interval='2ms', loop=self.loop) - self.batched_stream.start(comm) - finally: - self.periodic_callbacks['heartbeat'].start() - self.loop.add_callback(self.handle_scheduler, comm) -======= - self.batched_stream = BatchedSend(interval='2ms', loop=self.loop) - self.batched_stream.start(comm) - self.periodic_callbacks['heartbeat'].start() - self.loop.add_callback(self.handle_scheduler, comm) ->>>>>>> Cleanup - @gen.coroutine def heartbeat(self): with (yield self.heartbeat_lock.acquire()): @@ -868,25 +742,24 @@ def heartbeat(self): try: start = time() response = yield self.scheduler.heartbeat_worker( - address=self.contact_address, - now=time(), - metrics=self.get_metrics() + address=self.contact_address, now=time(), metrics=self.get_metrics() ) end = time() middle = (start + end) / 2 - if response['status'] == 'missing': + if response["status"] == "missing": yield self._register_with_scheduler() return - self.scheduler_delay = response['time'] - middle - self.periodic_callbacks['heartbeat'].callback_time = response['heartbeat-interval'] * 1000 + self.scheduler_delay = response["time"] - middle + self.periodic_callbacks["heartbeat"].callback_time = ( + response["heartbeat-interval"] * 1000 + ) except CommClosedError: logger.warning("Heartbeat to scheduler failed") finally: self.heartbeat_active = False else: logger.debug("Heartbeat skipped: channel busy") ->>>>>>> Improve worker/scheduler communication such that scheduler state isn't changed until checks are made and scheduler responses to the worker are not exceptions, rather "error" messages that the worker can process. @gen.coroutine def handle_scheduler(self, comm):