Skip to content

Commit

Permalink
Merge pull request #48 from mmerickel/sigkill
Browse files Browse the repository at this point in the history
upon reload send a SIGTERM first, then a SIGKILL as a last resort
  • Loading branch information
mmerickel authored Mar 7, 2019
2 parents 9eb33fd + d924833 commit c10ac9d
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 31 deletions.
10 changes: 10 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
unreleased
==========

- On systems that support ``SIGKILL`` and ``SIGTERM`` (not Windows), ``hupper``
will now send a ``SIGKILL`` to the worker process as a last resort. Normally,
a ``SIGINT`` (Ctrl-C) or ``SIGTERM`` (on reload) will kill the worker. If,
within ``shutdown_interval`` seconds, the worker doesn't exit, it will
receive a ``SIGKILL``.
See https://github.com/Pylons/hupper/pull/48

1.5 (2019-02-16)
================

Expand Down
25 changes: 25 additions & 0 deletions src/hupper/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import imp
import importlib
import site
import subprocess
import sys
import time

PY2 = sys.version_info[0] == 2
WIN = sys.platform == 'win32'
Expand Down Expand Up @@ -62,3 +64,26 @@ def get_site_packages(): # pragma: no cover
def with_metaclass(meta, base=object):
"""Create a base class with a metaclass."""
return meta("%sBase" % meta.__name__, (base,), {})


if PY2:

def subprocess_wait_with_timeout(process, timeout):
max_time = time.time() + timeout
while process.poll() is None:
dt = max_time - time.time()
if dt <= 0:
break
if dt > 0.5:
dt = 0.5
time.sleep(dt)
return process.poll()


else:

def subprocess_wait_with_timeout(process, timeout):
try:
return process.wait(timeout)
except subprocess.TimeoutExpired:
pass
17 changes: 17 additions & 0 deletions src/hupper/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .compat import WIN
from .compat import pickle
from .compat import queue
from .compat import subprocess_wait_with_timeout
from .utils import resolve_spec


Expand Down Expand Up @@ -316,3 +317,19 @@ def spawn_main(pipe_handle):
func = resolve_spec(spec)
func(**kwargs)
sys.exit(0)


def wait(process, timeout=None):
if timeout is None:
return process.wait()

if timeout == 0:
return process.poll()

return subprocess_wait_with_timeout(process, timeout)


def kill(process, soft=False):
if soft:
return process.terminate()
return process.kill()
66 changes: 41 additions & 25 deletions src/hupper/reloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
from .compat import queue
from .ipc import ProcessGroup
from .logger import DefaultLogger
from .utils import resolve_spec, is_watchdog_supported, is_watchman_supported
from .utils import default
from .utils import is_watchdog_supported
from .utils import is_watchman_supported
from .utils import resolve_spec
from .worker import Worker, is_active, get_reloader


Expand Down Expand Up @@ -82,6 +85,7 @@ def __init__(
monitor_factory,
logger,
reload_interval=1,
shutdown_interval=1,
worker_args=None,
worker_kwargs=None,
ignore_files=None,
Expand All @@ -92,6 +96,7 @@ def __init__(
self.ignore_files = ignore_files
self.monitor_factory = monitor_factory
self.reload_interval = reload_interval
self.shutdown_interval = shutdown_interval
self.logger = logger
self.monitor = None
self.group = ProcessGroup()
Expand Down Expand Up @@ -139,6 +144,7 @@ def _run_worker(self):
self.worker_path, args=self.worker_args, kwargs=self.worker_kwargs
)
worker.start()
force_restart = False

try:
# register the worker with the process group
Expand All @@ -147,30 +153,26 @@ def _run_worker(self):
self.logger.info('Starting monitor for PID %s.' % worker.pid)
self.monitor.clear_changes()

while not self.monitor.is_changed() and worker.is_alive():
while worker.is_alive():
if self.monitor.is_changed():
force_restart = True
break

try:
cmd = worker.pipe.recv(timeout=self.reload_interval)
except queue.Empty:
continue

if cmd is None:
if worker.is_alive():
# the worker socket has died but the process is still
# alive (somehow) so wait a brief period to see if it
# dies on its own - if it does die then we want to
# treat it as a crash and wait for changes before
# reloading, if it doesn't die then we want to force
# reload the app immediately because it probably
# didn't die due to some file changes
self.logger.info(
'Broken pipe to server with PID %s, waiting to '
'see if it dies.' % worker.pid
)
time.sleep(self.reload_interval)
self.logger.info(
'Broken pipe to server, triggering a reload.'
)
force_restart = True
break

if cmd[0] == 'reload':
self.logger.debug('Server triggered a reload.')
force_restart = True
break

if cmd[0] == 'watch':
Expand All @@ -180,27 +182,32 @@ def _run_worker(self):
else: # pragma: no cover
raise RuntimeError('received unknown command')

if worker.is_alive() and self.shutdown_interval is not None:
self.logger.info('Gracefully killing the server.')
worker.kill(soft=True)
worker.wait(self.shutdown_interval)

except KeyboardInterrupt:
if worker.is_alive():
self.logger.info('Waiting for server to exit ...')
time.sleep(self.reload_interval)
self.logger.info(
'Received interrupt, waiting for server to exit ...'
)
if self.shutdown_interval is not None:
worker.wait(self.shutdown_interval)
raise

finally:
if worker.is_alive():
self.logger.info('Killing server with PID %s.' % worker.pid)
worker.terminate()
self.logger.info('Server did not exit, forcefully killing.')
worker.kill()
worker.join()

else:
worker.join()
self.logger.info(
'Server with PID %s exited with code %d.'
% (worker.pid, worker.exitcode)
)
self.logger.debug('Server exited with code %d.' % worker.exitcode)

self.monitor.clear_changes()
return worker.terminated
return force_restart

def _wait_for_changes(self):
self.logger.info('Waiting for changes before reloading.')
Expand Down Expand Up @@ -268,6 +275,7 @@ def find_default_monitor_factory(logger):
def start_reloader(
worker_path,
reload_interval=1,
shutdown_interval=default,
verbose=1,
monitor_factory=None,
worker_args=None,
Expand All @@ -289,7 +297,11 @@ def start_reloader(
is invoking ``start_reloader`` in the first place.
``reload_interval`` is a value in seconds and will be used to throttle
restarts.
restarts. Default is ``1``.
``shutdown_interval`` is a value in seconds and will be used to trigger
a graceful shutdown of the server. Set to ``None`` to disable the graceful
shutdown. Default is the same as ``reload_interval``.
``verbose`` controls the output. Set to ``0`` to turn off any logging
of activity and turn up to ``2`` for extra output. Default is ``1``.
Expand Down Expand Up @@ -317,11 +329,15 @@ def start_reloader(
if monitor_factory is None:
monitor_factory = find_default_monitor_factory(logger)

if shutdown_interval is default:
shutdown_interval = reload_interval

reloader = Reloader(
worker_path=worker_path,
worker_args=worker_args,
worker_kwargs=worker_kwargs,
reload_interval=reload_interval,
shutdown_interval=shutdown_interval,
monitor_factory=monitor_factory,
logger=logger,
ignore_files=ignore_files,
Expand Down
11 changes: 11 additions & 0 deletions src/hupper/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@
from .compat import WIN


class Sentinel(object):
def __init__(self, name):
self.name = name

def __repr__(self):
return '<{0}>'.format(self.name)


default = Sentinel('default')


def resolve_spec(spec):
modname, funcname = spec.rsplit('.', 1)
module = importlib.import_module(modname)
Expand Down
13 changes: 7 additions & 6 deletions src/hupper/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ def __init__(self, spec, args=None, kwargs=None):
self.worker_args = args
self.worker_kwargs = kwargs
self.pipe, self._child_pipe = ipc.Pipe()
self.terminated = False
self.pid = None
self.process = None
self.exitcode = None
Expand Down Expand Up @@ -167,15 +166,17 @@ def is_alive(self):
if self.exitcode is not None:
return False
if self.process:
return self.process.poll() is None
return ipc.wait(self.process, timeout=0) is None
return False

def terminate(self):
self.terminated = True
self.process.terminate()
def kill(self, soft=False):
return ipc.kill(self.process, soft=soft)

def wait(self, timeout=None):
return ipc.wait(self.process, timeout=timeout)

def join(self):
self.exitcode = self.process.wait()
self.exitcode = self.wait()

if self.stdin_termios:
ipc.restore_termios(sys.stdin.fileno(), self.stdin_termios)
Expand Down

0 comments on commit c10ac9d

Please sign in to comment.