Skip to content
Permalink
Browse files

Allow to run multiple io.process.Process objects simultaneously

Currently, Process incorrectly handles stdout/stderr channels which rely on

global state:

  - static methods used to handle read/close events are instantiated on

    module import. Thus, they are being mocked by addHandler() which sets

    .channel to them. Starting a new process causes resetting that field

    on the global method objects, thus all processes will receive output

    from the last started process. To mitigate this, static methods are

    replaced with closure-like lambdas.

  - Process incorrectly raises stopped() event to close stdout/stderr. This

    event, however, is used to signal Manager's death. Thus, when process is

    completed, all sockets/pipes in the entire program are closed. This is why

    stopped() event has to be replaced with process-specific terminated() event

    and we need explicitly send close() to all process pipes.
  • Loading branch information...
myaut authored and prologic committed Apr 2, 2016
1 parent 056e15f commit 884b0710e578d44be485aed0daa58b9dfd574765
Showing with 56 additions and 28 deletions.
  1. +29 −27 circuits/io/process.py
  2. +27 −1 tests/io/test_process.py
@@ -7,10 +7,23 @@
from subprocess import Popen, PIPE

from circuits.core.manager import TIMEOUT
from circuits import handler, BaseComponent
from circuits import handler, BaseComponent, Event

from .file import File
from .events import started, stopped, write
from .events import started, write, close


class terminated(Event):
"""terminated Event
This Event is sent when a process is completed
:param args: (process)
:type tuple: tuple
"""

def __init__(self, *args):
super(terminated, self).__init__(*args)


class Process(BaseComponent):
@@ -73,39 +86,31 @@ def start(self):
).register(self)

self._stderr_read_handler = self.addHandler(
handler("read", channel="{0:d}.stderr".format(self.p.pid))(
self.__class__._on_stderr_read
handler("read", channel=self._stderr.channel)(
lambda self, data: self.stderr.write(data)
)
)

self._stdout_read_handler = self.addHandler(
handler("read", channel="{0:d}.stdout".format(self.p.pid))(
self.__class__._on_stdout_read
handler("read", channel=self._stdout.channel)(
lambda self, data: self.stdout.write(data)
)
)

self._stderr_closed_handler = self.addHandler(
handler("closed", channel="{0:d}.stderr".format(self.p.pid))(
self.__class__._on_stderr_closed
handler("closed", channel=self._stderr.channel)(
lambda self: setattr(self, '_stderr_closed', True)
)
)

self._stdout_closed_handler = self.addHandler(
handler("closed", channel="{0:d}.stdout".format(self.p.pid))(
self.__class__._on_stdout_closed
handler("closed", channel=self._stdout.channel)(
lambda self: setattr(self, '_stdout_closed', True)
)
)

self.fire(started(self))

@staticmethod
def _on_stdout_closed(self):
self._stdout_closed = True

@staticmethod
def _on_stderr_closed(self):
self._stderr_closed = True

def stop(self):
if self.p is not None:
self.p.terminate()
@@ -127,14 +132,6 @@ def status(self):
if getattr(self, "p", None) is not None:
return self.p.poll()

@staticmethod
def _on_stderr_read(self, data):
self.stderr.write(data)

@staticmethod
def _on_stdout_read(self, data):
self.stdout.write(data)

@handler("generate_events")
def _on_generate_events(self, event):
if self.p is not None and self._status is None:
@@ -147,7 +144,12 @@ def _on_generate_events(self, event):
self.removeHandler(self._stdout_read_handler)
self.removeHandler(self._stderr_closed_handler)
self.removeHandler(self._stdout_closed_handler)
self.fire(stopped(self))

self.fire(terminated(self))
self.fire(close(), self._stdin.channel,
self._stdout.channel,
self._stderr.channel)

event.reduce_time_left(0)
event.stop()
else:
@@ -14,7 +14,7 @@ def test(manager, watcher):
p.start()
assert watcher.wait("started", p.channel)

assert watcher.wait("stopped", p.channel)
assert watcher.wait("terminated", p.channel)

s = p.stdout.getvalue()
assert s == b"Hello World!\n"
@@ -39,3 +39,29 @@ def test2(manager, watcher, tmpdir):

with foo.open("r") as f:
assert f.read() == "Hello World!"

def test_two_procs(manager, watcher):
p1 = Process(["echo", "1"]).register(manager)
p2 = Process("echo 2 ; sleep 1", shell = True).register(manager)

p1.start()
p2.start()

assert watcher.wait("terminated", p1.channel)
assert p1._terminated
assert not p2._terminated
assert not p2._stdout_closed
assert not p2._stderr_closed

watcher.clear() # Get rid of first terminated()

s1 = p1.stdout.getvalue()
assert s1 == b"1\n"

assert watcher.wait("terminated", p2.channel)
assert p2._terminated
assert p2._stdout_closed
assert p2._stderr_closed

s2 = p2.stdout.getvalue()
assert s2 == b"2\n"

0 comments on commit 884b071

Please sign in to comment.
You can’t perform that action at this time.