Skip to content

Commit

Permalink
squash
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed Jan 8, 2017
1 parent 3eca3a4 commit 1a619a4
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 10 deletions.
3 changes: 3 additions & 0 deletions distributed/batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,6 @@ def close(self, ignore_closed=False):
if not ignore_closed:
raise
yield close(self.stream)

def closed(self):
return self.stream is None or self.stream.closed()
13 changes: 6 additions & 7 deletions distributed/tests/test_stress.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,14 +216,13 @@ def test_stress_steal(c, s, *workers):
if not s.processing:
break

from toolz import frequencies


@gen_cluster(ncores=[('127.0.0.1', 1)] * 10, client=True, timeout=60)
@slow
@gen_cluster(ncores=[('127.0.0.1', 1)] * 10, client=True, timeout=120)
def test_close_connections(c, s, *workers):
da = pytest.importorskip('dask.array')
x = da.random.random(size=(1000, 1000), chunks=(1000, 1))
for i in range(5):
for i in range(3):
x = x.rechunk((1, 1000))
x = x.rechunk((1000, 1))

Expand All @@ -233,8 +232,8 @@ def test_close_connections(c, s, *workers):
worker = random.choice(list(workers))
for stream in worker._listen_streams:
stream.close()
print(frequencies(s.task_state.values()))
for w in workers:
print(w)
# print(frequencies(s.task_state.values()))
# for w in workers:
# print(w)

yield _wait(future)
7 changes: 4 additions & 3 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,15 +230,15 @@ def identity(self, stream):
'memory_limit': self.memory_limit}

@gen.coroutine
def _close(self, report=True, timeout=10):
def _close(self, report=True, timeout=3):
if self.status in ('closed', 'closing'):
return
logger.info("Stopping worker at %s:%d", self.ip, self.port)
self.status = 'closing'
self.stop()
self.heartbeat_callback.stop()
with ignoring(EnvironmentError):
if report:
if report and self.batched_stream and not self.batched_stream.closed():
yield gen.with_timeout(timedelta(seconds=timeout),
self.scheduler.unregister(address=(self.ip, self.port)),
io_loop=self.loop)
Expand Down Expand Up @@ -948,7 +948,8 @@ def on_closed(_):
self.priority_counter += 1
try:
msgs = yield read(stream)
except EnvironmentError:
except EnvironmentError as e:
on_closed(None)
break

start = time()
Expand Down

0 comments on commit 1a619a4

Please sign in to comment.