Skip to content

Commit

Permalink
Better support for waking up the main master thread of a pre-fork env…
Browse files Browse the repository at this point in the history
…ironment
  • Loading branch information
joamag committed Mar 8, 2019
1 parent 64d0c70 commit d994dc0
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 25 deletions.
4 changes: 2 additions & 2 deletions src/netius/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@
from .config import conf, conf_prefix, conf_suffix, conf_s, conf_r, conf_d, conf_ctx
from .conn import OPEN, CLOSED, PENDING, CHUNK_SIZE, Connection
from .container import Container, ContainerServer
from .errors import NetiusError, RuntimeError, StopError, PauseError, DataError, ParserError,\
GeneratorError, SecurityError, NotImplemented, AssertionError
from .errors import NetiusError, RuntimeError, StopError, PauseError, WakeupError,\
DataError, ParserError, GeneratorError, SecurityError, NotImplemented, AssertionError
from .log import SILENT, rotating_handler, smtp_handler
from .observer import Observable
from .poll import Poll, EpollPoll, KqueuePoll, PollPoll, SelectPoll
Expand Down
45 changes: 22 additions & 23 deletions src/netius/base/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1518,6 +1518,12 @@ def stop(self):
if self.is_paused(): self.finish()
else: self._running = False

# in case the current process is the master in a pre-fork
# environment raises the stop error to wakeup the process
# from its current infinite loop for stop handling
if self._forked and not self._child:
raise errors.StopError("Wakeup")

def pause(self):
self._running = False
self._pausing = True
Expand Down Expand Up @@ -1829,9 +1835,7 @@ def pipe_send(message):
# registers for some of the common signals to be able to start
# the process of stopping and joining with the child processes
# in case there's a request to do so
def handler(signum = None, frame = None):
self.stop()
raise errors.StopError("Wakeup")
def handler(signum = None, frame = None): self.stop()
self.bind_signals(handler = handler)

# creates the pipe signal handler that is responsible for the
Expand All @@ -1849,16 +1853,13 @@ def callback():
command = pipein_fd.readline()[:-1]
self.on_command(command)

# raises an exception to stop the current loop
# cycle and allow proper review of execution
raise errors.StopError("Wakeup")

# schedules the current clojure to be executed as soon as
# possible and then forces the wakeup, because although we're
# running on the main thread we're possible under a blocking
# statement and so we need to wakeup the loop
self.delay(callback, immediately = True)
self.wakeup(force = True)
if hasattr(self, "_awaken") and not self._awaken:
raise errors.WakeupError("Delays")

# in case the user signal is defined registers for it so that it's
# possible to establish a communication between child and parent
Expand All @@ -1871,19 +1872,11 @@ def callback():
while self._running:
try:
self._wait_forever()
except BaseException as exception:
self.info("Parent process received exception: %s" % exception)

# prints a simple debug message about the sleep time that is going
# to occur to avoid child signal collision
self.debug("Sleeping for some time, to avoid collision of signals ...")

# sleeps for some time giving time to the child processes to
# process any pending signals (sending signals in the middle of
# signal processing may be problematic)
target = time.time() + 0.5
while time.time() < target:
time.sleep(0.25)
except (KeyboardInterrupt, SystemExit, errors.StopError):
pass
except Exception as exception:
self.warning("Parent process received exception: %s" % exception)
self.log_stack()

# closes both the file based pipe for input and the pipe used
# for the output of information (as expected)
Expand Down Expand Up @@ -3240,7 +3233,7 @@ def _delays(self):
# proper exception is set so that proper top level handling
# is defined and logging is performed
try: method()
except (KeyboardInterrupt, SystemExit):
except (KeyboardInterrupt, SystemExit, errors.StopError):
raise
except BaseException as exception:
self.error(exception)
Expand Down Expand Up @@ -3801,7 +3794,13 @@ def _format_delta(self, time_delta, count = 2):

def _wait_forever(self, sleep = 60):
while True:
time.sleep(sleep)
try:
self._awaken = True
try: self._delays()
finally: self._awaken = False
time.sleep(sleep)
except errors.WakeupError:
pass

class DiagBase(AbstractBase):

Expand Down
11 changes: 11 additions & 0 deletions src/netius/base/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ class PauseError(RuntimeError):

pass

class WakeupError(RuntimeError):
"""
Error used to send a wakeup intent from one context
or thread to another.
This is especially useful on the context of signal
handling where an interruption may happen at any time.
"""

pass

class DataError(RuntimeError):
"""
Error to be used for situations where the
Expand Down

0 comments on commit d994dc0

Please sign in to comment.