diff --git a/distributed/tests/test_stress.py b/distributed/tests/test_stress.py index e416d5b2ad..c824fda85a 100644 --- a/distributed/tests/test_stress.py +++ b/distributed/tests/test_stress.py @@ -175,9 +175,9 @@ def test_stress_communication(c, s, *workers): lim = 8192 if soft < lim: resource.setrlimit(resource.RLIMIT_NOFILE, (lim, max(hard, lim))) - except Exception as e: - pytest.skip("file descriptor limit too low and can't be increased :" - + str(e)) + except Exception as e: + pytest.skip("file descriptor limit too low and can't be increased :" + + str(e)) n = 40 xs = [da.random.random((100, 100), chunks=(5, 5)) for i in range(n)] diff --git a/distributed/worker.py b/distributed/worker.py index bd993e332e..e05bfa8446 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -101,7 +101,6 @@ def __init__(self, scheduler_ip, scheduler_port, ip=None, ncores=None, self.status = None self.reconnect = reconnect self.executor = executor or ThreadPoolExecutor(self.ncores) - self.admin_executor = ThreadPoolExecutor(2) self.scheduler = rpc(ip=scheduler_ip, port=scheduler_port) self.name = name self.heartbeat_interval = heartbeat_interval @@ -248,7 +247,6 @@ def _close(self, report=True, timeout=10): io_loop=self.loop) self.scheduler.close_rpc() self.executor.shutdown() - self.admin_executor.shutdown() if os.path.exists(self.local_dir): shutil.rmtree(self.local_dir) @@ -360,22 +358,12 @@ def delete_data(self, stream=None, keys=None, report=True): keys=list(keys)) raise Return('OK') - def _get_data(self, keys): - return {k: to_serialize(self.data[k]) for k in keys} - @gen.coroutine def get_data(self, stream, keys=None, who=None): start = time() - nbytes = {k: self.nbytes.get(k) for k in keys - if self.task_state.get(k) == 'memory' - or self.dep_state.get(k) == 'memory'} - if all(k in self.data.fast for k in nbytes) or len(nbytes) > 100: - msg = self._get_data(nbytes) - else: - msg = yield self.admin_executor.submit(self._get_data, nbytes) - msg = {k: to_serialize(self.data[k]) for k in keys if k in self.data} + nbytes = {k: self.nbytes.get(k) for k in keys if k in self.data} stop = time() if self.digests is not None: self.digests['get-data-load-duration'].add(stop - start)