From 1a619a4a6e16396b8c9af0876e564d7acf0b741f Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Sat, 7 Jan 2017 17:30:51 -0800 Subject: [PATCH] squash --- distributed/batched.py | 3 +++ distributed/tests/test_stress.py | 13 ++++++------- distributed/worker.py | 7 ++++--- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/distributed/batched.py b/distributed/batched.py index f2c370d820..83818f3f0e 100644 --- a/distributed/batched.py +++ b/distributed/batched.py @@ -121,3 +121,6 @@ def close(self, ignore_closed=False): if not ignore_closed: raise yield close(self.stream) + + def closed(self): + return self.stream is None or self.stream.closed() diff --git a/distributed/tests/test_stress.py b/distributed/tests/test_stress.py index 21b7e4e2cd..d1469209cd 100644 --- a/distributed/tests/test_stress.py +++ b/distributed/tests/test_stress.py @@ -216,14 +216,13 @@ def test_stress_steal(c, s, *workers): if not s.processing: break -from toolz import frequencies - -@gen_cluster(ncores=[('127.0.0.1', 1)] * 10, client=True, timeout=60) +@slow +@gen_cluster(ncores=[('127.0.0.1', 1)] * 10, client=True, timeout=120) def test_close_connections(c, s, *workers): da = pytest.importorskip('dask.array') x = da.random.random(size=(1000, 1000), chunks=(1000, 1)) - for i in range(5): + for i in range(3): x = x.rechunk((1, 1000)) x = x.rechunk((1000, 1)) @@ -233,8 +232,8 @@ def test_close_connections(c, s, *workers): worker = random.choice(list(workers)) for stream in worker._listen_streams: stream.close() - print(frequencies(s.task_state.values())) - for w in workers: - print(w) + # print(frequencies(s.task_state.values())) + # for w in workers: + # print(w) yield _wait(future) diff --git a/distributed/worker.py b/distributed/worker.py index 39a13950d1..cef0c673e6 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -230,7 +230,7 @@ def identity(self, stream): 'memory_limit': self.memory_limit} @gen.coroutine - def _close(self, report=True, timeout=10): + def _close(self, report=True, timeout=3): if self.status in ('closed', 'closing'): return logger.info("Stopping worker at %s:%d", self.ip, self.port) @@ -238,7 +238,7 @@ def _close(self, report=True, timeout=10): self.stop() self.heartbeat_callback.stop() with ignoring(EnvironmentError): - if report: + if report and self.batched_stream and not self.batched_stream.closed(): yield gen.with_timeout(timedelta(seconds=timeout), self.scheduler.unregister(address=(self.ip, self.port)), io_loop=self.loop) @@ -948,7 +948,8 @@ def on_closed(_): self.priority_counter += 1 try: msgs = yield read(stream) - except EnvironmentError: + except EnvironmentError as e: + on_closed(None) break start = time()