Skip to content

Commit

Permalink
Avoid getting data in main thread
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed Jan 10, 2017
1 parent e811a64 commit b9c3f51
Showing 1 changed file with 17 additions and 4 deletions.
21 changes: 17 additions & 4 deletions distributed/worker.py
Expand Up @@ -101,6 +101,7 @@ def __init__(self, scheduler_ip, scheduler_port, ip=None, ncores=None,
self.status = None
self.reconnect = reconnect
self.executor = executor or ThreadPoolExecutor(self.ncores)
self.admin_executor = ThreadPoolExecutor(2)
self.scheduler = rpc(ip=scheduler_ip, port=scheduler_port)
self.name = name
self.heartbeat_interval = heartbeat_interval
Expand Down Expand Up @@ -247,6 +248,7 @@ def _close(self, report=True, timeout=10):
io_loop=self.loop)
self.scheduler.close_rpc()
self.executor.shutdown()
self.admin_executor.shutdown()
if os.path.exists(self.local_dir):
shutil.rmtree(self.local_dir)

Expand Down Expand Up @@ -358,12 +360,22 @@ def delete_data(self, stream=None, keys=None, report=True):
keys=list(keys))
raise Return('OK')

def _get_data(self, keys):
return {k: to_serialize(self.data[k]) for k in keys}

@gen.coroutine
def get_data(self, stream, keys=None, who=None):
start = time()

nbytes = {k: self.nbytes.get(k) for k in keys
if self.task_state.get(k) == 'memory'
or self.dep_state.get(k) == 'memory'}
if all(k in self.data.fast for k in nbytes) or len(nbytes) > 100:
msg = self._get_data(nbytes)
else:
msg = yield self.admin_executor.submit(self._get_data, nbytes)

msg = {k: to_serialize(self.data[k]) for k in keys if k in self.data}
nbytes = {k: self.nbytes.get(k) for k in keys if k in self.data}
stop = time()
if self.digests is not None:
self.digests['get-data-load-duration'].add(stop - start)
Expand Down Expand Up @@ -1373,13 +1385,14 @@ def ensure_communicating(self):

def send_task_state_to_scheduler(self, key):
if key in self.data:
value = self.data[key]
nbytes = self.nbytes[key] or sizeof(self.data[key])
typ = self.types.get(key) or type(self.data[key])
d = {'op': 'task-finished',
'status': 'OK',
'key': key,
'nbytes': self.nbytes.get(key) or sizeof(value),
'nbytes': nbytes,
'thread': self.threads.get(key),
'type': dumps_function(type(value))}
'type': dumps_function(typ)}
elif key in self.exceptions:
d = {'op': 'task-erred',
'status': 'error',
Expand Down

0 comments on commit b9c3f51

Please sign in to comment.