Skip to content

Loading…

implements the stderr/stdout stream redirection #75

Merged
merged 22 commits into from

2 participants

@tarekziade

This branch implements the stderr/stdout stream redirection

@benoitc benoitc commented on an outdated diff
circus/stream.py
((18 lines not shown))
+class FileStream(object):
+ def __init__(self, filename):
+ self._file = open(filename, 'a+')
+ self._buffer = []
+
+ def __call__(self, data):
+ self._file.write(data['data'])
+ self._file.flush()
+
+ def close(self):
+ self._file.close()
+
+
+# if gevent and gevent_zmq are available, let's use a Greenlet
+# o/wise fallback to a Thread + a select
+try:
@benoitc
benoitc added a note

maybe having in the ini the setting

 [circus]
 backend = gevent

instead of testing on import would be better ? or do you think it can wait next release?

Why not, but 'backend' is a bit vague at this level.

If the idea is to say = we will use gevent here and there in Circus when it's useful to speed up and/or scale a feature, I propose to rename it to "use_gevent = 1"

@benoitc
benoitc added a note

mmm the idea is more: if we don't find gevent when you want is as the backend we raise, else we use thread.

use_gevent would mean for me that we fallback on thread if gevent is not installed. Both are OK for me, it would be interesting to have opinions of an OP or admin there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@tarekziade tarekziade merged commit bf9791c into master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Apr 11, 2012
  1. @tarekziade
  2. @tarekziade

    added an example + tweaks

    tarekziade committed
  3. @tarekziade

    now using structured streams

    tarekziade committed
  4. @tarekziade
  5. @tarekziade

    added a QueueStream

    tarekziade committed
  6. @tarekziade

    simplified the api

    tarekziade committed
  7. @tarekziade

    cleanup

    tarekziade committed
Commits on Apr 13, 2012
  1. @tarekziade
  2. @tarekziade

    added an example + tweaks

    tarekziade committed
  3. @tarekziade

    now using structured streams

    tarekziade committed
  4. @tarekziade
  5. @tarekziade

    added a QueueStream

    tarekziade committed
  6. @tarekziade

    simplified the api

    tarekziade committed
  7. @tarekziade

    cleanup

    tarekziade committed
  8. @tarekziade

    more doc

    tarekziade committed
  9. @tarekziade

    fixed conflict

    tarekziade committed
  10. @tarekziade
  11. @tarekziade
  12. @tarekziade

    calling the thread init.

    tarekziade committed
  13. @tarekziade
  14. @tarekziade

    cleanup

    tarekziade committed
  15. @tarekziade
View
23 circus/__init__.py
@@ -1,6 +1,15 @@
import logging
import os
+try:
+ from gevent import monkey
+ from gevent_zeromq import monkey_patch
+ monkey.patch_all()
+ monkey_patch()
+except ImportError:
+ pass
+
+
version_info = (0, 1, 0)
__version__ = ".".join(map(str, version_info))
@@ -11,7 +20,8 @@
def get_arbiter(watchers, controller='tcp://127.0.0.1:5555',
pubsub_endpoint='tcp://127.0.0.1:5556',
env=None, name=None, context=None,
- check_flapping=True, background=False):
+ check_flapping=True, background=False, stdout_stream=None,
+ stderr_stream=None):
"""Creates a Arbiter and a single watcher in it.
Options:
@@ -36,6 +46,10 @@ def get_arbiter(watchers, controller='tcp://127.0.0.1:5555',
- **uid** -- the user id used to run the flies (default: None)
- **gid** -- the group id used to run the flies (default: None)
- **env** -- the environment passed to the flies (default: None)
+ - **stdout_stream**: a callable that will receive the stream of
+ the process stdout.
+ - **stderr_stream**: a callable that will receive the stream of
+ the process stderr.
- **controller** -- the zmq entry point (default: 'tcp://127.0.0.1:5555')
- **pubsub_endpoint** -- the zmq entry point for the pubsub
@@ -45,6 +59,8 @@ def get_arbiter(watchers, controller='tcp://127.0.0.1:5555',
(default:True)
- **background** -- If True, the arbiter is launched in a thread in the
background (default: False)
+
+
"""
from circus.watcher import Watcher
if background:
@@ -68,7 +84,10 @@ def get_arbiter(watchers, controller='tcp://127.0.0.1:5555',
uid=watcher.get('uid'),
gid=watcher.get('gid'),
env=watcher.get('env'),
- executable=watcher.get('executable'))
+ executable=watcher.get('executable'),
+ stdout_stream=watcher.get('stdout_stream'),
+ stderr_stream=watcher.get('stderr_stream'))
+
_watchers.append(watcher)
return Arbiter(_watchers, controller, pubsub_endpoint, context=context,
View
29 circus/circusd.py
@@ -9,6 +9,8 @@
from circus.watcher import Watcher
from circus.pidfile import Pidfile
from circus import util
+from circus.stream import FileStream
+
MAXFD = 1024
if hasattr(os, "devnull"):
@@ -133,6 +135,28 @@ def main():
retry_in = cfg.dget(section, "retry_in", 7, int)
max_retry = cfg.dget(section, "max_retry", 5, int)
graceful_timeout = cfg.dget(section, "graceful_timeout", 30, int)
+ stderr_file = cfg.dget(section, 'stderr_file', None, str)
+ stdout_file = cfg.dget(section, 'stdout_file', None, str)
+ stderr_stream = cfg.dget(section, 'stderr_stream', None, str)
+ stdout_stream = cfg.dget(section, 'stdout_stream', None, str)
+
+ if stderr_stream is not None and stderr_file is not None:
+ raise ValueError('"stderr_stream" and "stderr_file" are '
+ 'mutually exclusive')
+
+ if stdout_stream is not None and stdout_file is not None:
+ raise ValueError('"stdout_stream" and "stdout_file" are '
+ 'mutually exclusive')
+
+ if stderr_file is not None:
+ stderr_stream = FileStream(stderr_file)
+ elif stderr_stream is not None:
+ stderr_stream = util.resolve_name(stderr_stream)
+
+ if stdout_file is not None:
+ stdout_stream = FileStream(stdout_file)
+ elif stdout_stream is not None:
+ stdout_stream = util.resolve_name(stdout_stream)
rlimits = {}
for cfg_name, cfg_value in cfg.items(section):
@@ -146,7 +170,8 @@ def main():
shell=shell, uid=uid, gid=gid, send_hup=send_hup,
times=times, within=within, retry_in=retry_in,
max_retry=max_retry, graceful_timeout=graceful_timeout,
- rlimits=rlimits)
+ rlimits=rlimits, stderr_stream=stderr_stream,
+ stdout_stream=stdout_stream)
watchers.append(watcher)
@@ -159,6 +184,8 @@ def main():
arbiter = Arbiter(watchers, endpoint, pubsub_endpoint, check)
try:
arbiter.start()
+ except KeyboardInterrupt:
+ pass
finally:
arbiter.stop()
if pidfile is not None:
View
1 circus/process.py
@@ -137,6 +137,7 @@ def preexec_fn():
shell=self.shell, preexec_fn=preexec_fn,
env=self.env, close_fds=True, stdout=PIPE,
stderr=PIPE, executable=executable)
+ #bufsize=1)
self.started = time.time()
View
158 circus/stream.py
@@ -0,0 +1,158 @@
+import fcntl
+import errno
+import os
+import sys
+from Queue import Queue
+from threading import Thread
+import select
+import time
+
+
+class QueueStream(Queue):
+ def __call__(self, data):
+ self.put(data)
+
+ def close(self):
+ pass
+
+
+class FileStream(object):
+ def __init__(self, filename):
+ self._file = open(filename, 'a+')
+ self._buffer = []
+
+ def __call__(self, data):
+ self._file.write(data['data'])
+ self._file.flush()
+
+ def close(self):
+ self._file.close()
+
+
+class NamedPipe(object):
+ def __init__(self, pipe, process, name):
+ self.pipe = pipe
+ self.process = process
+ self.name = name
+ fcntl.fcntl(pipe, fcntl.F_SETFL, os.O_NONBLOCK)
+ self._fileno = pipe.fileno()
+
+ def fileno(self):
+ return self._fileno
+
+ def read(self, buffer):
+ if self.pipe.closed:
+ return
+ return self.pipe.read(buffer)
+
+
+class BaseRedirector(object):
+ def __init__(self, redirect, extra_info=None, buffer=1024, selector=None):
+ self.pipes = []
+ self._names = {}
+ self.redirect = redirect
+ self.extra_info = extra_info
+ self.buffer = buffer
+ self.running = False
+ if extra_info is None:
+ extra_info = {}
+ self.extra_info = extra_info
+ if selector is None:
+ selector = select.select
+ self.selector = selector
+
+ def add_redirection(self, name, process, pipe):
+ npipe = NamedPipe(pipe, process, name)
+ self.pipes.append(npipe)
+ self._names[process.pid, name] = npipe
+
+ def remove_redirection(self, name, process):
+ pipe = self._names[process.pid, name]
+ self.pipes.remove(pipe)
+ del self._names[process.pid, name]
+
+ def _select(self):
+ if len(self.pipes) == 0:
+ time.sleep(.1)
+ return
+
+ try:
+ try:
+ rlist, __, __ = self.selector(self.pipes, [], [])
+ except select.error: # need a non specific error
+ return
+
+ for pipe in rlist:
+ data = pipe.read(self.buffer)
+ if data:
+ datamap = {'data': data, 'pid': pipe.process.pid,
+ 'name': pipe.name}
+ datamap.update(self.extra_info)
+ self.redirect(datamap)
+ except IOError, ex:
+ if ex[0] != errno.EAGAIN:
+ raise
+ sys.exc_clear()
+
+# if gevent and gevent_zmq are available, let's use a Greenlet
+# o/wise fallback to a Thread + a select
+try:
+ import gevent_zeromq # just to make sure zmq will not block # NOQA
+ from gevent import Greenlet
+ from gevent.select import select as gselect
+
+ class GreenRedirector(BaseRedirector, Greenlet):
+ def __init__(self, redirect, extra_info=None, buffer=1024):
+ Greenlet.__init__(self)
+ BaseRedirector.__init__(self, redirect, extra_info, buffer,
+ selector=gselect)
+
+ def _run(self, *args, **kwargs):
+ while True:
+ self._select()
+
+ Redirector = GreenRedirector
+except ImportError:
+
+ class ThreadedRedirector(BaseRedirector, Thread):
+ def __init__(self, redirect, extra_info=None, buffer=1024):
+ Thread.__init__(self)
+ BaseRedirector.__init__(self, redirect, extra_info, buffer)
+ self.running = False
+
+ def run(self):
+ self.running = True
+
+ while self.running:
+ self._select()
+
+ def kill(self):
+ if not self.running:
+ return
+ self.running = False
+ self.join()
+
+ Redirector = ThreadedRedirector
+
+
+def get_pipe_redirector(redirect, extra_info=None, buffer=1024):
+ """Redirects data received in pipes to the redirect callable.
+
+ This function creates a separate thread that continuously reads
+ data in the provided pipe and sends it to the provided callable.
+
+ If Gevent and Gevent_zeromq are installed, this function will use
+ a Greenlet for efficiency. It will fallback to a plain thread otherwise,
+ and that may lead to poor performances and a lot of memory consumption
+ when you have a lot of workers.
+
+ The data is a mapping with a **data** key containing the data
+ received from the pipe, extended with all values passed in
+ **extra_info**
+
+ Options:
+ - **redirect**: the callable to send data to
+ - **extra_info**: a mapping of values to add to each call
+ - **buffer**: the size of the buffer when reading data
+ """
+ return Redirector(redirect, extra_info, buffer)
View
15 circus/tests/support.py
@@ -49,14 +49,24 @@ class TestCircus(unittest.TestCase):
def setUp(self):
self.arbiters = []
self.files = []
+ self.tmpfiles = []
def tearDown(self):
self._stop_runners()
- for file in self.files:
+ for file in self.files + self.tmpfiles:
if os.path.exists(file):
os.remove(file)
- def _run_circus(self, callable):
+ def get_tmpfile(self, content=None):
+ fd, file = mkstemp()
+ os.close(fd)
+ self.tmpfiles.append(file)
+ if content is not None:
+ with open(file, 'w') as f:
+ f.write(content)
+ return file
+
+ def _run_circus(self, callable, **kw):
resolve_name(callable) # used to check the callable
fd, testfile = mkstemp()
os.close(fd)
@@ -64,6 +74,7 @@ def _run_circus(self, callable):
args = ['generic.py', callable, testfile]
worker = {'cmd': _CMD, 'args': args, 'working_dir': wdir,
'name': 'test'}
+ worker.update(kw)
arbiter = get_arbiter([worker], background=True)
arbiter.start()
time.sleep(.3)
View
50 circus/tests/test_process.py
@@ -1,12 +1,23 @@
-import unittest
-import os
import sys
-import tempfile
import time
+
from circus.process import Process, RUNNING
+from circus.tests.support import TestCircus
+
+
+RLIMIT = '''\
+import resource, sys
+
+f = open(sys.argv[1], 'w')
+for limit in ('NOFILE', 'NPROC'):
+ res = getattr(resource, 'RLIMIT_%s' % limit)
+ f.write('%s=%s\\n' % (limit, resource.getrlimit(res)))
+
+f.close
+'''
-class TestProcess(unittest.TestCase):
+class TestProcess(TestCircus):
def test_base(self):
cmd = sys.executable
@@ -22,24 +33,14 @@ def test_base(self):
process.stop()
def test_rlimits(self):
- script_file = tempfile.mkstemp()[1]
- output_file = tempfile.mkstemp()[1]
- script = '''
-import resource, sys
-f = open(sys.argv[1], 'w')
-for limit in ('NOFILE', 'NPROC'):
- res = getattr(resource, 'RLIMIT_%s' % limit)
- f.write('%s=%s\\n' % (limit, resource.getrlimit(res)))
-f.close
- '''
- f = open(script_file, 'w')
- f.write(script)
- f.close()
+ script_file = self.get_tmpfile(RLIMIT)
+ output_file = self.get_tmpfile()
+
cmd = sys.executable
args = [script_file, output_file]
rlimits = {'nofile': 20,
- 'nproc': 20,
- }
+ 'nproc': 20}
+
process = Process('test', cmd, args=args, rlimits=rlimits)
# wait for the process to finish
while process.status == RUNNING:
@@ -48,7 +49,7 @@ def test_rlimits(self):
f = open(output_file, 'r')
output = {}
for line in f.readlines():
- (limit, value) = line.rstrip().split('=', 1)
+ limit, value = line.rstrip().split('=', 1)
output[limit] = value
f.close()
@@ -56,9 +57,6 @@ def srt2ints(val):
return [long(key) for key in val[1:-1].split(',')]
wanted = [20L, 20L]
- try:
- self.assertEqual(srt2ints(output['NOFILE']), wanted)
- self.assertEqual(srt2ints(output['NPROC']), wanted)
- finally:
- os.unlink(script_file)
- os.unlink(output_file)
+
+ self.assertEqual(srt2ints(output['NOFILE']), wanted)
+ self.assertEqual(srt2ints(output['NPROC']), wanted)
View
13 circus/tests/test_watcher.py
@@ -5,11 +5,15 @@
from circus.client import CircusClient, make_message
from circus.tests.support import TestCircus
+from circus.stream import QueueStream
def run_process(test_file):
try:
+ i = 0
while True:
+ sys.stdout.write('%d-%s\\n' % (os.getpid(), i))
+ sys.stdout.flush()
time.sleep(1)
except:
return 1
@@ -19,8 +23,10 @@ class TestWatcher(TestCircus):
def setUp(self):
super(TestWatcher, self).setUp()
+ self.stream = QueueStream()
dummy_process = 'circus.tests.test_watcher.run_process'
- self.test_file = self._run_circus(dummy_process)
+ self.test_file = self._run_circus(dummy_process,
+ stdout_stream=self.stream)
self.cli = CircusClient()
def call(self, cmd, **props):
@@ -64,3 +70,8 @@ def testStats(self):
self.assertEqual(resp['test']['1']['cmdline'],
sys.executable.split(os.sep)[-1])
+
+ def test_streams(self):
+ time.sleep(2.)
+ # let's see what we got
+ self.assertTrue(self.stream.qsize() > 1)
View
44 circus/util.py
@@ -6,6 +6,7 @@
import fcntl
import fnmatch
from functools import wraps
+import sys
from psutil.error import AccessDenied, NoSuchProcess
from circus import logger
@@ -236,3 +237,46 @@ def read_config(config_path):
cfg_files_read.extend(cfg.read(includes))
return cfg, cfg_files_read
+
+
+# taken from distutils2
+def resolve_name(name):
+ """Resolve a name like ``module.object`` to an object and return it.
+
+ This functions supports packages and attributes without depth limitation:
+ ``package.package.module.class.class.function.attr`` is valid input.
+ However, looking up builtins is not directly supported: use
+ ``__builtin__.name``.
+
+ Raises ImportError if importing the module fails or if one requested
+ attribute is not found.
+ """
+ if '.' not in name:
+ # shortcut
+ __import__(name)
+ return sys.modules[name]
+
+ # FIXME clean up this code!
+ parts = name.split('.')
+ cursor = len(parts)
+ module_name = parts[:cursor]
+ ret = ''
+
+ while cursor > 0:
+ try:
+ ret = __import__('.'.join(module_name))
+ break
+ except ImportError:
+ cursor -= 1
+ module_name = parts[:cursor]
+
+ if ret == '':
+ raise ImportError(parts[0])
+
+ for part in parts[1:]:
+ try:
+ ret = getattr(ret, part)
+ except AttributeError, exc:
+ raise ImportError(exc)
+
+ return ret
View
74 circus/watcher.py
@@ -7,6 +7,7 @@
from circus.process import Process, DEAD_OR_ZOMBIE
from circus import logger
from circus import util
+from circus.stream import get_pipe_redirector
class Watcher(object):
@@ -16,8 +17,34 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0.,
gid=None, send_hup=False, env=None, stopped=True,
times=2, within=1., retry_in=7., max_retry=5,
graceful_timeout=30., prereload_fn=None,
- rlimits=None, executable=None):
- """ init
+ rlimits=None, executable=None, stdout_stream=None,
+ stderr_stream=None):
+ """
+
+ Options:
+ - XXX to complete
+
+ - **stdout_stream**: a callable that will receive the stream of
+ the process stdout.
+
+ Each entry is a mapping containing:
+
+ - **pid** - the process pid
+ - **name** - the stream name (*stderr* or *stdout*)
+ - **data** - the data
+
+ Defaults to None.
+
+ - **stderr_stream**: a callable that will receive the stream of
+ the process stderr.
+
+ Each entry is a mapping containing:
+
+ - **pid** - the process pid
+ - **name** - the stream name (*stderr* or *stdout*)
+ - **data** - the data
+
+ Defaults to None.
"""
self.name = name
self.res_name = name.lower().replace(" ", "_")
@@ -34,6 +61,17 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0.,
self.graceful_timeout = 30
self.prereload_fn = prereload_fn
self.executable = None
+ self.stdout_stream = stdout_stream
+ if stdout_stream is not None:
+ self.stdout_redirector = get_pipe_redirector(stdout_stream)
+ else:
+ self.stdout_redirector = None
+
+ self.stderr_stream = stderr_stream
+ if stderr_stream is not None:
+ self.stderr_redirector = get_pipe_redirector(stderr_stream)
+ else:
+ self.stderr_redirector = None
self.optnames = ("numprocesses", "warmup_delay", "working_dir",
"uid", "gid", "send_hup", "shell", "env",
@@ -45,7 +83,6 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0.,
working_dir = util.get_working_dir()
self.working_dir = working_dir
-
self.processes = {}
self.shell = shell
self.uid = uid
@@ -149,6 +186,17 @@ def spawn_process(self):
env=self.env, rlimits=self.rlimits,
executable=self.executable)
+ # stream stderr/stdout if configured
+ if self.stdout_redirector is not None:
+ self.stdout_redirector.add_redirection('stdout',
+ process,
+ process.stdout)
+
+ if self.stderr_redirector is not None:
+ self.stderr_redirector.add_redirection('stderr',
+ process,
+ process.stderr)
+
self.processes[self._process_counter] = process
logger.debug('running %s process [pid %d]', self.name,
process.pid)
@@ -170,6 +218,13 @@ def spawn_process(self):
def kill_process(self, process, sig=signal.SIGTERM):
"""Kill process.
"""
+ # remove redirections
+ if self.stdout_redirector is not None:
+ self.stdout_redirector.remove_redirection('stdout', process)
+
+ if self.stderr_redirector is not None:
+ self.stderr_redirector.remove_redirection('stderr', process)
+
self.send_msg("kill", {"process_id": process.wid,
"time": time.time()})
logger.debug("%s: kill process %s", self.name, process.pid)
@@ -239,6 +294,12 @@ def stop(self, graceful=True):
"""Stop.
"""
self.stopped = True
+ # stop redirectors
+ if self.stdout_redirector is not None:
+ self.stdout_redirector.kill()
+
+ if self.stderr_redirector is not None:
+ self.stderr_redirector.kill()
sig = signal.SIGQUIT
if not graceful:
@@ -257,6 +318,7 @@ def stop(self, graceful=True):
self.kill_processes(signal.SIGKILL)
if self.evpub_socket is not None:
self.send_msg("stop", {"time": time.time()})
+
logger.info('%s stopped', self.name)
@util.debuglog
@@ -269,6 +331,12 @@ def start(self):
self.stopped = False
self.reap_processes()
self.manage_processes()
+ if self.stdout_redirector is not None:
+ self.stdout_redirector.start()
+
+ if self.stderr_redirector is not None:
+ self.stderr_redirector.start()
+
logger.info('%s started' % self.name)
self.send_msg("start", {"time": time.time()})
View
27 docs/source/configuration.rst
@@ -8,6 +8,8 @@ Example::
[circus]
check_delay = 5
endpoint = tcp://127.0.0.1:5555
+ pubsub_endpoint = tcp://127.0.0.1:5556
+
[watcher:myprogram]
cmd = python
@@ -15,17 +17,22 @@ Example::
warmup_delay = 0
numprocesses = 5
+
circus (single section)
~~~~~~~~~~~~~~~~~~~~~~~
**endpoint**
- The endpoint to which the ZMQ socket will be bound.
+ The ZMQ socket used to manage Circus via **circusctl**.
+ (default: *tcp://127.0.0.1:5555*)
+ **pubsub_endpoint**
+ The ZMQ PUB/SUB socket receiving publications of events.
+ (default: *tcp://127.0.0.1:5556*)
**check_delay**
- The polling interval for the ZMQ socket.
+ The polling interval in seconds for the ZMQ socket. (default: 5)
**include**
- List of config files to include.
+ List of config files to include. (defaults: None)
**include_dir**
List of config directories. All files matching `*.ini` under each
- directory will be included.
+ directory will be included. (defaults: None)
watcher:NAME (as many sections as you want)
@@ -57,3 +64,15 @@ watcher:NAME (as many sections as you want)
<http://docs.python.org/library/resource.html#resource-limits>`_.
For example, the config line 'rlimit_nofile = 500' sets the maximum
number of open files to 500.
+ **stderr_file**
+ A file that will receive the **stderr** stream of all workers.
+ (default: none)
+ **stdout_file**
+ A file that will receive the **stdout** stream of all workers.
+ (default: none)
+ **stderr_stream**
+ A fully qualified Python callable thet will receive the **stderr**
+ stream of all workers. (default: none, incompatible with *stderr_file*.)
+ **stdout_stream**
+ A fully qualified Python callable thet will receive the **stdout**
+ stream of all workers. (default: none, incompatible with *stdout_file*.)
View
10 examples/circus2.ini
@@ -0,0 +1,10 @@
+[circus]
+check_delay = 5
+endpoint = tcp://127.0.0.1:5555
+
+[watcher:verbose]
+cmd = python
+args = -u verbose_fly.py
+warmup_delay = 0
+numprocesses = 200
+stdout_file = test.log
View
13 examples/verbose_fly.py
@@ -0,0 +1,13 @@
+#!/usr/bin/env python
+import os
+import time
+import sys
+
+i = 0
+
+while True:
+ #print '%d:%d' % (os.getpid(), i)
+ sys.stdout.write('%d:%d\n' % (os.getpid(), i))
+ sys.stdout.flush()
+ time.sleep(0.1)
+ i += 1
Something went wrong with that request. Please try again.