Skip to content

Commit

Permalink
Revert "Avoid getting data in main thread"
Browse files Browse the repository at this point in the history
This reverts commit b9c3f51.
  • Loading branch information
mrocklin committed Jan 11, 2017
1 parent e923f31 commit 5e381c3
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 16 deletions.
6 changes: 3 additions & 3 deletions distributed/tests/test_stress.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,9 @@ def test_stress_communication(c, s, *workers):
lim = 8192
if soft < lim:
resource.setrlimit(resource.RLIMIT_NOFILE, (lim, max(hard, lim)))
except Exception as e:
pytest.skip("file descriptor limit too low and can't be increased :"
+ str(e))
except Exception as e:
pytest.skip("file descriptor limit too low and can't be increased :"
+ str(e))

n = 40
xs = [da.random.random((100, 100), chunks=(5, 5)) for i in range(n)]
Expand Down
14 changes: 1 addition & 13 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ def __init__(self, scheduler_ip, scheduler_port, ip=None, ncores=None,
self.status = None
self.reconnect = reconnect
self.executor = executor or ThreadPoolExecutor(self.ncores)
self.admin_executor = ThreadPoolExecutor(2)
self.scheduler = rpc(ip=scheduler_ip, port=scheduler_port)
self.name = name
self.heartbeat_interval = heartbeat_interval
Expand Down Expand Up @@ -248,7 +247,6 @@ def _close(self, report=True, timeout=10):
io_loop=self.loop)
self.scheduler.close_rpc()
self.executor.shutdown()
self.admin_executor.shutdown()
if os.path.exists(self.local_dir):
shutil.rmtree(self.local_dir)

Expand Down Expand Up @@ -360,22 +358,12 @@ def delete_data(self, stream=None, keys=None, report=True):
keys=list(keys))
raise Return('OK')

def _get_data(self, keys):
return {k: to_serialize(self.data[k]) for k in keys}

@gen.coroutine
def get_data(self, stream, keys=None, who=None):
start = time()

nbytes = {k: self.nbytes.get(k) for k in keys
if self.task_state.get(k) == 'memory'
or self.dep_state.get(k) == 'memory'}
if all(k in self.data.fast for k in nbytes) or len(nbytes) > 100:
msg = self._get_data(nbytes)
else:
msg = yield self.admin_executor.submit(self._get_data, nbytes)

msg = {k: to_serialize(self.data[k]) for k in keys if k in self.data}
nbytes = {k: self.nbytes.get(k) for k in keys if k in self.data}
stop = time()
if self.digests is not None:
self.digests['get-data-load-duration'].add(stop - start)
Expand Down

0 comments on commit 5e381c3

Please sign in to comment.