Skip to content

Commit

Permalink
Initial support for child to parent communication on pre-fork model
Browse files Browse the repository at this point in the history
  • Loading branch information
joamag committed Jan 17, 2019
1 parent 232f306 commit ad3afdc
Showing 1 changed file with 60 additions and 15 deletions.
75 changes: 60 additions & 15 deletions src/netius/base/common.py
Expand Up @@ -1781,7 +1781,8 @@ def fork(self):
# sent to the output pipe, normalizes it and sends it
def pipe_send(message):
if not hasattr(signal, "SIGUSR1"): return
os.write(pipeout, message + "\n")
frame = legacy.bytes(message) + b"\n"
os.write(pipeout, frame)
os.kill(ppid, signal.SIGUSR1) #@UndefinedVariable

# prints a debug operation about the operation that is
Expand Down Expand Up @@ -1809,44 +1810,84 @@ def pipe_send(message):
# valid value should be returned (force logic continuation)
if self._child: return True

# marks the current (parent) process as running and sets
# its current state as "started"
self._running = True
self._pausing = False
self.set_state(STATE_START)

# opens a file object for the input pipe so that it's easier
# to read it as a stream (read a complete line)
pipein_fd = os.fdopen(pipein)

# registers for some of the common signals to be able to avoid
# any possible interaction with the joining process
def handler(signum = None, frame = None): raise errors.StopError("Stop")
def handler(signum = None, frame = None):
self.stop()
raise errors.StopError("Wakeup")
self.bind_signals(handler = handler)

# creates the pipe signal handler that is responsible for the
# reading of the pipe information from the child process to
# the parent process (as expected)
def pipe_handler(signum = None, frame = None):
try:
command = pipein.readline()[:-1]
except BaseException as exception:
print(exception)
return
print("Received command %s" % command)
print(command)
# in case the current process is considered to be not
# running then returns the control flow immediately
# not possible to handle any command
if not self._running: return

# reads a line from the input pipe considering it to be a
# command and then calls the callbacks for command
command = pipein_fd.readline()[:-1]
self.on_command(command)

# raises an exception to stop the current loop
# cycle and allow proper review of execution
raise errors.StopError("Wakeup")

# in case the user signal is defined registers for it so that it's
# possible to establish a communication between child and parent
if hasattr(signal, "SIGUSR1"):
signal.signal(signal.SIGUSR1, pipe_handler) #@UndefinedVariable

# sleeps forever, waiting for an interruption of the current
# process that triggers the children to quit, so that it's
# able to "join" all of them into the current process
try: self._wait_forever()
except: pass
while self._running:
try:
self._wait_forever()
except BaseException as exception:
self.info("Parent process received exception: %s" % exception)

# prints a simple debug message about the sleep time that is going
# to occur to avoid child signal collision
self.debug("Sleeping for some time, to avoid collision of signals ...")

# sleeps for some time giving time to the child processes to
# process any pending signals (sending signals in the middle of
# signal processing may be problematic)
target = time.time() + 0.5
while time.time() < target:
time.sleep(0.25)

# closes both the file based pipe for input and the pipe used
# for the output of information (as expected)
pipein_fd.close()
os.close(pipeout)

# prints a debug information about the processes to be joined
# this indicated the start of the joining process
self.debug("Joining '%d' child processes ..." % self.children)

# iterates over the complete set of children to send the proper
# terminate signal to each of them for proper termination
for pid in self._childs: os.kill(pid, signal.SIGTERM) #@UndefinedVariable
for pid in self._childs:
os.kill(pid, signal.SIGTERM) #@UndefinedVariable

# iterates over the complete set of child processes to join
# them (master process responsibility)
for pid in self._childs: os.waitpid(pid, 0)
for pid in self._childs:
os.waitpid(pid, 0)

# calls the final (on) join method indicating that the complete
# set of child processes have been join and that now only the
Expand Down Expand Up @@ -2394,6 +2435,9 @@ def on_child(self, pipe = None):
self.bind_signals(handler = signal.SIG_IGN)
self.bind_signals(signals = (signal.SIGTERM,))

def on_command(self, command):
self.trigger("command", self, command)

def on_diag(self):
self.trigger("diag", self)

Expand Down Expand Up @@ -3730,8 +3774,9 @@ def _format_delta(self, time_delta, count = 2):
delta_s += "%ds" % seconds
return delta_s.strip()

def _wait_forever(self):
while True: time.sleep(60)
def _wait_forever(self, sleep = 60):
while True:
time.sleep(sleep)

class DiagBase(AbstractBase):

Expand Down

0 comments on commit ad3afdc

Please sign in to comment.