Skip to content

Commit

Permalink
make the configuration of redirection easier
Browse files Browse the repository at this point in the history
Now the configuration of a stream can accept any class and any
options to allow more flexibilty.

Configuration in the ini should be passed in this form:

    [watcher:name]
    ...
    stream_stdout.class = FileStream
    stream_stdout.file = filename
    stream_stdout.refresh_time = 0.3

Same for `stream_stderr` .

if the `file` option is passed withut any `class`, the `FileStream`
class will be used by default.

By default the `refresh_time` is 0.3.

The watcher dict created from the ini is:

    {
        ..
        'stream_stdout': {
            'class': FileStream,
            'file': 'filename',
            'refresh_time': 0.3
        }
    }
  • Loading branch information
benoitc committed Apr 17, 2012
1 parent b25d6f2 commit f60c41c
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 63 deletions.
1 change: 0 additions & 1 deletion circus/arbiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def load_from_config(cls, config_file):

watchers = []
for watcher in cfg.get('watchers', []):
watcher['stream_backend'] = cfg['stream_backend']
watchers.append(Watcher.load_from_config(watcher))

# creating arbiter
Expand Down
161 changes: 110 additions & 51 deletions circus/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,28 @@
from circus import util


WATCHER_DEFAULTS = {
'name': '',
'cmd': '',
'args': '',
'numprocesses': 1,
'warmup_delay': 0,
'executable': None,
'working_dir': None,
'shell': False,
'uid': None,
'gid': None,
'send_hup': False,
'times': 2,
'within': 1,
'retry_in': 7,
'max_retry': 5,
'graceful_timeout': 30,
'rlimits': {},
'stderr_stream': {},
'stdout_stream': {},
'stream_backend': 'thread'}

class DefaultConfigParser(ConfigParser.ConfigParser):
def dget(self, section, option, default=None, type=str):
if not self.has_option(section, option):
Expand Down Expand Up @@ -43,6 +65,36 @@ def read_config(config_path):
return cfg, cfg_files_read


def stream_config(watcher_name, stream_conf):
if not stream_conf:
return stream_conf

if not 'class' in stream_conf:
# we can handle othe class there
if 'file' in stream_conf:
obj = FileStream
else:
raise ValueError("stream configuration invalid in %r" %
watcher_name)

else:
class_name = stream_conf.pop('class')
if not "." in class_name:
class_name = "circus.stream.%s" % class_name

obj = util.resolve_name(class_name)

# default refres_time
if not 'refresh_time' in stream_conf:
refresh_time = 0.3
else:
refresh_time = float(stream_conf.pop('refresh_time'))

# initialize stream instance
inst = obj(**stream_conf)

return {'stream': inst, 'refresh_time': refresh_time}

def get_config(config_file):
cfg, cfg_files_read = read_config(config_file)
dget = cfg.dget
Expand Down Expand Up @@ -77,60 +129,67 @@ def get_config(config_file):
watchers = []
for section in cfg.sections():
if section.startswith("watcher:"):
watcher = {}
watcher = WATCHER_DEFAULTS

This comment has been minimized.

Copy link
@KangOl

KangOl Apr 17, 2012

By using the dict directly, you share the config between all the watchers
This had to be

watcher = copy.deepcopy(WATCHER_DEFAULTS)

deepcopy is needed because the dict contains dict's
A function returning the defaults is a better option IMHO.

This comment has been minimized.

Copy link
@tarekziade

tarekziade Apr 18, 2012

Member

Do you mind creating a new issue for this ?

Thanks a lot

This comment has been minimized.

Copy link
@benoitc

benoitc Apr 18, 2012

Author Contributor

fixed in last head. Thanks a lot!

watcher['name'] = section.split("watcher:", 1)[1]
watcher['cmd'] = get(section, 'cmd')
watcher['args'] = dget(section, 'args', '')
watcher['numprocesses'] = dget(section, 'numprocesses', 1, int)
watcher['warmup_delay'] = dget(section, 'warmup_delay', 0, int)
watcher['executable'] = dget(section, 'executable', None, str)
watcher['working_dir'] = dget(section, 'working_dir')
watcher['shell'] = dget(section, 'shell', False, bool)
watcher['uid '] = dget(section, 'uid')
watcher['gid'] = dget(section, 'gid')
watcher['send_hup'] = dget(section, 'send_hup', False, bool)
watcher['times'] = dget(section, "times", 2, int)
watcher['within'] = dget(section, "within", 1, int)
watcher['retry_in'] = dget(section, "retry_in", 7, int)
watcher['max_retry'] = dget(section, "max_retry", 5, int)
watcher['graceful_timeout'] = dget(section, "graceful_timeout", 30,
int)

# loading the streams
stderr_file = dget(section, 'stderr_file', None, str)
stdout_file = dget(section, 'stdout_file', None, str)
stderr_stream = dget(section, 'stderr_stream', None, str)
stdout_stream = dget(section, 'stdout_stream', None, str)

if stderr_stream is not None and stderr_file is not None:
raise ValueError('"stderr_stream" and "stderr_file" are '
'mutually exclusive')

if stdout_stream is not None and stdout_file is not None:
raise ValueError('"stdout_stream" and "stdout_file" are '
'mutually exclusive')

if stderr_file is not None:
watcher['stderr_stream'] = FileStream(stderr_file)
elif stderr_stream is not None:
watcher['stderr_stream '] = util.resolve_name(stderr_stream)

if stdout_file is not None:
watcher['stdout_stream'] = FileStream(stdout_file)
elif stdout_stream is not None:
watcher['stdout_stream'] = util.resolve_name(stdout_stream)

rlimits = {}
for cfg_name, cfg_value in cfg.items(section):
if cfg_name.startswith('rlimit_'):
limit = cfg_name[7:]
rlimits[limit] = int(cfg_value)

watcher['rlimits'] = rlimits

# create watcher options
for opt, val in cfg.items(section):
if opt == 'cmd':
watcher['cmd'] = val
elif opt == 'args':
watcher['args'] = val
elif opt == 'numprocesses':
watcher['numprocesses'] = dget(section, 'numprocesses', 1,
int)
elif opt == 'warmup_delay':
watcher['warmup_delay'] = dget(section, 'warmup_delay', 0,
int)
elif opt == 'executable':
watcher['executable'] = dget(section, 'executable', None,
str)
elif opt == 'working_dir':
watcher['working_dir'] = val
elif opt == 'shell':
watcher['shell'] = dget(section, 'shell', False, bool)
elif opt == 'uid':
watcher['uid '] = val
elif opt == 'gid':
watcher['gid'] = val
elif opt == 'send_hup':
watcher['send_hup'] = dget(section, 'send_hup', False,
bool)
elif opt == 'times':
watcher['times'] = dget(section, "times", 2, int)
elif opt == 'within':
watcher['within'] = dget(section, "within", 1, int)
elif opt == 'retry_ind':
watcher['retry_in'] = dget(section, "retry_in", 7, int)
elif opt == 'max_retry':
watcher['max_retry'] = dget(section, "max_retry", 5, int)
elif opt == 'graceful_timout':
watcher['graceful_timeout'] = dget(section,
"graceful_timeout", 30, int)
elif opt.startswith('stderr_stream') or \
opt.startswith('stdout_stream'):
stream_name, stream_opt = opt.split(".", 1)
watcher[stream_name][stream_opt] = val
elif opt.startswith('rlimit_'):
limit = opt[7:]
rlimits[limit] = int(val)

# second pass, parse stream conf
stdout_conf = watcher.get('stdout_stream', {})
watcher['stdout_stream'] = stream_config(watcher['name'],
stdout_conf)

stderr_conf = watcher.get('stderr_stream', {})
watcher['stderr_stream'] = stream_config(watcher['name'],
stderr_conf)

# set the stream backend
watcher['stream_backend'] = stream_backend

watchers.append(watcher)

config['watchers'] = watchers


return config
19 changes: 16 additions & 3 deletions circus/stream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
from circus.util import import_module

class QueueStream(Queue):

def __init__(self, **kwargs):
Queue.__init__(self, **kwargs)

def __call__(self, data):
self.put(data)

Expand All @@ -11,8 +15,8 @@ def close(self):


class FileStream(object):
def __init__(self, filename):
self._file = open(filename, 'a+')
def __init__(self, file=None, **kwargs):
self._file = open(file, 'a+')
self._buffer = []

def __call__(self, data):
Expand Down Expand Up @@ -45,6 +49,15 @@ def get_pipe_redirector(redirect, backend='thread', extra_info=None,
- **buffer**: the size of the buffer when reading data
"""

# get stream infos
if 'stream' not in redirect:
return
stream = redirect.get('stream')
refresh_time = redirect.get('refresh_time', 0.3)

# get backend class
backend_mod = import_module("circus.stream.s%s" % backend)
backend_class = getattr(backend_mod, "Redirector")
return backend_class(redirect, extra_info, buffer)

# finally setup the redirection
return backend_class(stream, refresh_time, extra_info, buffer)
6 changes: 4 additions & 2 deletions circus/stream/sgevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
from circus.stream.base import BaseRedirector

class Redirector(BaseRedirector, Greenlet):
def __init__(self, redirect, extra_info=None, buffer=1024):
def __init__(self, redirect, refresh_time=0.3, extra_info=None,
buffer=1024):
Greenlet.__init__(self)
BaseRedirector.__init__(self, redirect, extra_info, buffer,
selector=select)
self.refresh_time = refresh_time

def _run(self, *args, **kwargs):
while True:
self._select()
sleep(0.3)
sleep(self.refresh_time)
6 changes: 4 additions & 2 deletions circus/stream/sthread.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@
from circus.stream.base import BaseRedirector

class Redirector(BaseRedirector, Thread):
def __init__(self, redirect, extra_info=None, buffer=1024):
def __init__(self, redirect, refresh_time=0.3, extra_info=None,
buffer=1024):
Thread.__init__(self)
BaseRedirector.__init__(self, redirect, extra_info, buffer)
self.running = False
self.refresh_time = refresh_time

def run(self):
self.running = True
while self.running:
self._select()
time.sleep(0.3)
time.sleep(self.refresh_time)

def kill(self):
if not self.running:
Expand Down
2 changes: 1 addition & 1 deletion circus/tests/test_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def setUp(self):
self.stream = QueueStream()
dummy_process = 'circus.tests.test_watcher.run_process'
self.test_file = self._run_circus(dummy_process,
stdout_stream=self.stream)
stdout_stream={'stream': self.stream})
self.cli = CircusClient()

def call(self, cmd, **props):
Expand Down
4 changes: 2 additions & 2 deletions circus/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,14 @@ def __init__(self, name, cmd, args=None, numprocesses=1, warmup_delay=0.,
self.stream_backend = stream_backend

self.stdout_stream = stdout_stream
if stdout_stream is not None:
if stdout_stream:
self.stdout_redirector = get_pipe_redirector(stdout_stream,
backend=stream_backend)
else:
self.stdout_redirector = None

self.stderr_stream = stderr_stream
if stderr_stream is not None:
if stderr_stream:
self.stderr_redirector = get_pipe_redirector(stderr_stream,
backend=stream_backend)
else:
Expand Down
6 changes: 5 additions & 1 deletion examples/circus2.ini
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ cmd = python
args = -u verbose_fly.py
warmup_delay = 0
numprocesses = 200
stdout_file = test.log

; stream options
stdout_stream.class = FileStream
stdout_stream.file = test.log
stdout_stream.refresh_time = 0.3

0 comments on commit f60c41c

Please sign in to comment.