Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Respect timeouts when restarting #1304

Merged
merged 2 commits into from Aug 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion continuous_integration/setup_conda_environment.cmd
Expand Up @@ -32,7 +32,7 @@ call deactivate
mock ^
msgpack-python ^
psutil ^
pytest ^
pytest=3.1 ^
python=%PYTHON% ^
requests ^
toolz ^
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/travis/install.sh
Expand Up @@ -45,7 +45,7 @@ conda install -q -c conda-forge \
netcdf4 \
paramiko \
psutil \
pytest \
pytest=3.1 \
pytest-faulthandler \
pytest-timeout \
requests \
Expand Down
16 changes: 8 additions & 8 deletions distributed/nanny.py
Expand Up @@ -141,7 +141,7 @@ def start(self, addr_or_port=0):
self.loop.add_callback(self._start, addr_or_port)

@gen.coroutine
def kill(self, comm=None, timeout=10):
def kill(self, comm=None, timeout=2):
""" Kill the local worker process

Blocks until both the process is down and the scheduler is properly
Expand All @@ -152,7 +152,7 @@ def kill(self, comm=None, timeout=10):
raise gen.Return('OK')

deadline = self.loop.time() + timeout
yield self.process.kill(grace_delay=0.8 * (deadline - self.loop.time()))
yield self.process.kill(timeout=0.8 * (deadline - self.loop.time()))
yield self._unregister(deadline - self.loop.time())

@gen.coroutine
Expand Down Expand Up @@ -195,9 +195,9 @@ def instantiate(self, comm=None):
raise gen.Return('OK')

@gen.coroutine
def restart(self, comm=None):
def restart(self, comm=None, timeout=2):
if self.process is not None:
yield self.kill()
yield self.kill(timeout=timeout)
yield self.instantiate()
raise gen.Return('OK')

Expand Down Expand Up @@ -346,13 +346,13 @@ def mark_stopped(self):
self.on_exit(r)

@gen.coroutine
def kill(self, grace_delay=10):
def kill(self, timeout=2):
"""
Ensure the worker process is stopped, waiting at most
*grace_delay* seconds before terminating it abruptly.
*timeout* seconds before terminating it abruptly.
"""
loop = IOLoop.current()
deadline = loop.time() + grace_delay
deadline = loop.time() + timeout

if self.status == 'stopped':
return
Expand All @@ -372,7 +372,7 @@ def kill(self, grace_delay=10):

if process.is_alive():
logger.warning("Worker process still alive after %d seconds, killing",
grace_delay)
timeout)
try:
yield process.terminate()
except Exception as e:
Expand Down
3 changes: 2 additions & 1 deletion distributed/scheduler.py
Expand Up @@ -1574,7 +1574,8 @@ def restart(self, client=None, timeout=3):
if nanny_address is not None]

try:
resps = All([nanny.restart(close=True) for nanny in nannies])
resps = All([nanny.restart(close=True, timeout=timeout * 0.8)
for nanny in nannies])
resps = yield gen.with_timeout(timedelta(seconds=timeout), resps)
assert all(resp == 'OK' for resp in resps)
except gen.TimeoutError:
Expand Down
13 changes: 12 additions & 1 deletion distributed/tests/test_worker_failure.py
Expand Up @@ -18,7 +18,7 @@
from distributed.metrics import time
from distributed.utils import sync, ignoring
from distributed.utils_test import (gen_cluster, cluster, inc, loop, slow, div,
slowinc, slowadd)
slowinc, slowadd, captured_logger)


def test_submit_after_failed_worker_sync(loop):
Expand Down Expand Up @@ -372,3 +372,14 @@ def test_worker_who_has_clears_after_failed_connection(c, s, a, b):
assert not any(n_worker_address in s for s in a.who_has.values())

yield n._close()


@gen_cluster(client=True, timeout=None, Worker=Nanny, ncores=[('127.0.0.1', 1)])
def test_restart_timeout_on_long_running_task(c, s, a):
with captured_logger('distributed.scheduler') as sio:
future = c.submit(sleep, 3600)
yield gen.sleep(0.1)
yield c.restart()

text = sio.getvalue()
assert 'timeout' not in text.lower()
4 changes: 2 additions & 2 deletions distributed/threadpoolexecutor.py
Expand Up @@ -64,9 +64,9 @@ def _adjust_thread_count(self):
self._threads.add(t)
t.start()

def shutdown(self):
def shutdown(self, wait=True):
with threads_lock:
thread.ThreadPoolExecutor.shutdown(self)
thread.ThreadPoolExecutor.shutdown(self, wait=wait)


def secede():
Expand Down
2 changes: 1 addition & 1 deletion distributed/worker.py
Expand Up @@ -329,7 +329,7 @@ def _close(self, report=True, timeout=10, nanny=True):
self.scheduler.unregister(address=self.address),
io_loop=self.loop)
self.scheduler.close_rpc()
self.executor.shutdown()
self.executor.shutdown(wait=False)
if os.path.exists(self.local_dir):
shutil.rmtree(self.local_dir)

Expand Down