implements the stderr/stdout stream redirection #75

Merged
merged 22 commits into from Apr 13, 2012
Jump to file or symbol
Failed to load files and symbols.
+418 −39
Diff settings

Always

Just for now

View
@@ -1,6 +1,15 @@
import logging
import os
+try:
+ from gevent import monkey
+ from gevent_zeromq import monkey_patch
+ monkey.patch_all()
+ monkey_patch()
+except ImportError:
+ pass
+
+
version_info = (0, 1, 0)
__version__ = ".".join(map(str, version_info))
@@ -11,7 +20,8 @@
def get_arbiter(watchers, controller='tcp://127.0.0.1:5555',
pubsub_endpoint='tcp://127.0.0.1:5556',
env=None, name=None, context=None,
- check_flapping=True, background=False):
+ check_flapping=True, background=False, stdout_stream=None,
+ stderr_stream=None):
"""Creates a Arbiter and a single watcher in it.
Options:
@@ -36,6 +46,10 @@ def get_arbiter(watchers, controller='tcp://127.0.0.1:5555',
- **uid** -- the user id used to run the flies (default: None)
- **gid** -- the group id used to run the flies (default: None)
- **env** -- the environment passed to the flies (default: None)
+ - **stdout_stream**: a callable that will receive the stream of
+ the process stdout.
+ - **stderr_stream**: a callable that will receive the stream of
+ the process stderr.
- **controller** -- the zmq entry point (default: 'tcp://127.0.0.1:5555')
- **pubsub_endpoint** -- the zmq entry point for the pubsub
@@ -45,6 +59,8 @@ def get_arbiter(watchers, controller='tcp://127.0.0.1:5555',
(default:True)
- **background** -- If True, the arbiter is launched in a thread in the
background (default: False)
+
+
"""
from circus.watcher import Watcher
if background:
@@ -68,7 +84,10 @@ def get_arbiter(watchers, controller='tcp://127.0.0.1:5555',
uid=watcher.get('uid'),
gid=watcher.get('gid'),
env=watcher.get('env'),
- executable=watcher.get('executable'))
+ executable=watcher.get('executable'),
+ stdout_stream=watcher.get('stdout_stream'),
+ stderr_stream=watcher.get('stderr_stream'))
+
_watchers.append(watcher)
return Arbiter(_watchers, controller, pubsub_endpoint, context=context,
View
@@ -9,6 +9,8 @@
from circus.watcher import Watcher
from circus.pidfile import Pidfile
from circus import util
+from circus.stream import FileStream
+
MAXFD = 1024
if hasattr(os, "devnull"):
@@ -133,6 +135,28 @@ def main():
retry_in = cfg.dget(section, "retry_in", 7, int)
max_retry = cfg.dget(section, "max_retry", 5, int)
graceful_timeout = cfg.dget(section, "graceful_timeout", 30, int)
+ stderr_file = cfg.dget(section, 'stderr_file', None, str)
+ stdout_file = cfg.dget(section, 'stdout_file', None, str)
+ stderr_stream = cfg.dget(section, 'stderr_stream', None, str)
+ stdout_stream = cfg.dget(section, 'stdout_stream', None, str)
+
+ if stderr_stream is not None and stderr_file is not None:
+ raise ValueError('"stderr_stream" and "stderr_file" are '
+ 'mutually exclusive')
+
+ if stdout_stream is not None and stdout_file is not None:
+ raise ValueError('"stdout_stream" and "stdout_file" are '
+ 'mutually exclusive')
+
+ if stderr_file is not None:
+ stderr_stream = FileStream(stderr_file)
+ elif stderr_stream is not None:
+ stderr_stream = util.resolve_name(stderr_stream)
+
+ if stdout_file is not None:
+ stdout_stream = FileStream(stdout_file)
+ elif stdout_stream is not None:
+ stdout_stream = util.resolve_name(stdout_stream)
rlimits = {}
for cfg_name, cfg_value in cfg.items(section):
@@ -146,7 +170,8 @@ def main():
shell=shell, uid=uid, gid=gid, send_hup=send_hup,
times=times, within=within, retry_in=retry_in,
max_retry=max_retry, graceful_timeout=graceful_timeout,
- rlimits=rlimits)
+ rlimits=rlimits, stderr_stream=stderr_stream,
+ stdout_stream=stdout_stream)
watchers.append(watcher)
@@ -159,6 +184,8 @@ def main():
arbiter = Arbiter(watchers, endpoint, pubsub_endpoint, check)
try:
arbiter.start()
+ except KeyboardInterrupt:
+ pass
finally:
arbiter.stop()
if pidfile is not None:
View
@@ -137,6 +137,7 @@ def preexec_fn():
shell=self.shell, preexec_fn=preexec_fn,
env=self.env, close_fds=True, stdout=PIPE,
stderr=PIPE, executable=executable)
+ #bufsize=1)
self.started = time.time()
View
@@ -0,0 +1,158 @@
+import fcntl
+import errno
+import os
+import sys
+from Queue import Queue
+from threading import Thread
+import select
+import time
+
+
+class QueueStream(Queue):
+ def __call__(self, data):
+ self.put(data)
+
+ def close(self):
+ pass
+
+
+class FileStream(object):
+ def __init__(self, filename):
+ self._file = open(filename, 'a+')
+ self._buffer = []
+
+ def __call__(self, data):
+ self._file.write(data['data'])
+ self._file.flush()
+
+ def close(self):
+ self._file.close()
+
+
+class NamedPipe(object):
+ def __init__(self, pipe, process, name):
+ self.pipe = pipe
+ self.process = process
+ self.name = name
+ fcntl.fcntl(pipe, fcntl.F_SETFL, os.O_NONBLOCK)
+ self._fileno = pipe.fileno()
+
+ def fileno(self):
+ return self._fileno
+
+ def read(self, buffer):
+ if self.pipe.closed:
+ return
+ return self.pipe.read(buffer)
+
+
+class BaseRedirector(object):
+ def __init__(self, redirect, extra_info=None, buffer=1024, selector=None):
+ self.pipes = []
+ self._names = {}
+ self.redirect = redirect
+ self.extra_info = extra_info
+ self.buffer = buffer
+ self.running = False
+ if extra_info is None:
+ extra_info = {}
+ self.extra_info = extra_info
+ if selector is None:
+ selector = select.select
+ self.selector = selector
+
+ def add_redirection(self, name, process, pipe):
+ npipe = NamedPipe(pipe, process, name)
+ self.pipes.append(npipe)
+ self._names[process.pid, name] = npipe
+
+ def remove_redirection(self, name, process):
+ pipe = self._names[process.pid, name]
+ self.pipes.remove(pipe)
+ del self._names[process.pid, name]
+
+ def _select(self):
+ if len(self.pipes) == 0:
+ time.sleep(.1)
+ return
+
+ try:
+ try:
+ rlist, __, __ = self.selector(self.pipes, [], [])
+ except select.error: # need a non specific error
+ return
+
+ for pipe in rlist:
+ data = pipe.read(self.buffer)
+ if data:
+ datamap = {'data': data, 'pid': pipe.process.pid,
+ 'name': pipe.name}
+ datamap.update(self.extra_info)
+ self.redirect(datamap)
+ except IOError, ex:
+ if ex[0] != errno.EAGAIN:
+ raise
+ sys.exc_clear()
+
+# if gevent and gevent_zmq are available, let's use a Greenlet
+# o/wise fallback to a Thread + a select
+try:
+ import gevent_zeromq # just to make sure zmq will not block # NOQA
+ from gevent import Greenlet
+ from gevent.select import select as gselect
+
+ class GreenRedirector(BaseRedirector, Greenlet):
+ def __init__(self, redirect, extra_info=None, buffer=1024):
+ Greenlet.__init__(self)
+ BaseRedirector.__init__(self, redirect, extra_info, buffer,
+ selector=gselect)
+
+ def _run(self, *args, **kwargs):
+ while True:
+ self._select()
+
+ Redirector = GreenRedirector
+except ImportError:
+
+ class ThreadedRedirector(BaseRedirector, Thread):
+ def __init__(self, redirect, extra_info=None, buffer=1024):
+ Thread.__init__(self)
+ BaseRedirector.__init__(self, redirect, extra_info, buffer)
+ self.running = False
+
+ def run(self):
+ self.running = True
+
+ while self.running:
+ self._select()
+
+ def kill(self):
+ if not self.running:
+ return
+ self.running = False
+ self.join()
+
+ Redirector = ThreadedRedirector
+
+
+def get_pipe_redirector(redirect, extra_info=None, buffer=1024):
+ """Redirects data received in pipes to the redirect callable.
+
+ This function creates a separate thread that continuously reads
+ data in the provided pipe and sends it to the provided callable.
+
+ If Gevent and Gevent_zeromq are installed, this function will use
+ a Greenlet for efficiency. It will fallback to a plain thread otherwise,
+ and that may lead to poor performances and a lot of memory consumption
+ when you have a lot of workers.
+
+ The data is a mapping with a **data** key containing the data
+ received from the pipe, extended with all values passed in
+ **extra_info**
+
+ Options:
+ - **redirect**: the callable to send data to
+ - **extra_info**: a mapping of values to add to each call
+ - **buffer**: the size of the buffer when reading data
+ """
+ return Redirector(redirect, extra_info, buffer)
View
@@ -49,21 +49,32 @@ class TestCircus(unittest.TestCase):
def setUp(self):
self.arbiters = []
self.files = []
+ self.tmpfiles = []
def tearDown(self):
self._stop_runners()
- for file in self.files:
+ for file in self.files + self.tmpfiles:
if os.path.exists(file):
os.remove(file)
- def _run_circus(self, callable):
+ def get_tmpfile(self, content=None):
+ fd, file = mkstemp()
+ os.close(fd)
+ self.tmpfiles.append(file)
+ if content is not None:
+ with open(file, 'w') as f:
+ f.write(content)
+ return file
+
+ def _run_circus(self, callable, **kw):
resolve_name(callable) # used to check the callable
fd, testfile = mkstemp()
os.close(fd)
wdir = os.path.dirname(__file__)
args = ['generic.py', callable, testfile]
worker = {'cmd': _CMD, 'args': args, 'working_dir': wdir,
'name': 'test'}
+ worker.update(kw)
arbiter = get_arbiter([worker], background=True)
arbiter.start()
time.sleep(.3)
@@ -1,12 +1,23 @@
-import unittest
-import os
import sys
-import tempfile
import time
+
from circus.process import Process, RUNNING
+from circus.tests.support import TestCircus
+
+
+RLIMIT = '''\
+import resource, sys
+
+f = open(sys.argv[1], 'w')
+for limit in ('NOFILE', 'NPROC'):
+ res = getattr(resource, 'RLIMIT_%s' % limit)
+ f.write('%s=%s\\n' % (limit, resource.getrlimit(res)))
+
+f.close
+'''
-class TestProcess(unittest.TestCase):
+class TestProcess(TestCircus):
def test_base(self):
cmd = sys.executable
@@ -22,24 +33,14 @@ def test_base(self):
process.stop()
def test_rlimits(self):
- script_file = tempfile.mkstemp()[1]
- output_file = tempfile.mkstemp()[1]
- script = '''
-import resource, sys
-f = open(sys.argv[1], 'w')
-for limit in ('NOFILE', 'NPROC'):
- res = getattr(resource, 'RLIMIT_%s' % limit)
- f.write('%s=%s\\n' % (limit, resource.getrlimit(res)))
-f.close
- '''
- f = open(script_file, 'w')
- f.write(script)
- f.close()
+ script_file = self.get_tmpfile(RLIMIT)
+ output_file = self.get_tmpfile()
+
cmd = sys.executable
args = [script_file, output_file]
rlimits = {'nofile': 20,
- 'nproc': 20,
- }
+ 'nproc': 20}
+
process = Process('test', cmd, args=args, rlimits=rlimits)
# wait for the process to finish
while process.status == RUNNING:
@@ -48,17 +49,14 @@ def test_rlimits(self):
f = open(output_file, 'r')
output = {}
for line in f.readlines():
- (limit, value) = line.rstrip().split('=', 1)
+ limit, value = line.rstrip().split('=', 1)
output[limit] = value
f.close()
def srt2ints(val):
return [long(key) for key in val[1:-1].split(',')]
wanted = [20L, 20L]
- try:
- self.assertEqual(srt2ints(output['NOFILE']), wanted)
- self.assertEqual(srt2ints(output['NPROC']), wanted)
- finally:
- os.unlink(script_file)
- os.unlink(output_file)
+
+ self.assertEqual(srt2ints(output['NOFILE']), wanted)
+ self.assertEqual(srt2ints(output['NPROC']), wanted)
Oops, something went wrong.