Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

minor updates

  • Loading branch information...
commit 027131f53efcd6efce48cd23a705cadf982e285e 1 parent a38b1cc
Andrew authored
Showing with 72 additions and 43 deletions.
  1. +2 −0  README.md
  2. +1 −2  TODO
  3. +22 −21 setup.py
  4. +18 −4 slurp
  5. +29 −16 slurp.py
View
2  README.md
@@ -6,7 +6,9 @@
## Dependencies
+* lockfile >= 1.9
* pyinotify >= 0.9.3
+* python-daemon >= 1.5
* Python >= 2.5
View
3  TODO
@@ -1,4 +1,3 @@
-- hup handler for conf reload when daemonized (this should consumer code too)
-- move daemonize from .py into script and use python-daemon
+- hup handler for conf reload when daemonized (this should reload consumer code too)
- allow ||-ized monitoring (e.g. map paths, consumers to a tag and use 1 worker for that tag, no tag uses default worker)
- need contrib/examples.py
View
43 setup.py
@@ -5,18 +5,19 @@
setup(
- name = "slurp",
- version = __version__,
- description = "Log file slurper",
- author = "noone",
- author_email = "noone@nowhere.com",
- url = "https://github.com/bninja/slurp",
- keywords = [
- "slurp",
+ name='slurp',
+ version=__version__,
+ description='Log file slurper',
+ author='noone',
+ author_email='noone@nowhere.com',
+ url='https://github.com/bninja/slurp',
+ keywords=[
+ 'slurp',
],
- install_requires = [
- "pyinotify==0.9.3",
- "lockfile==0.9.1",
+ install_requires=[
+ 'lockfile==0.9.1',
+ 'pyinotify==0.9.3',
+ 'python-daemon==1.6',
],
py_modules=[
'slurp',
@@ -24,15 +25,15 @@
scripts=[
'slurp',
],
- classifiers = [
- "Development Status :: 4 - Beta",
- "Intended Audience :: Developers",
- "License :: OSI Approved :: MIT License",
- "Operating System :: OS Independent",
- "Programming Language :: Python",
- "Programming Language :: Python :: 2.5",
- "Programming Language :: Python :: 2.6",
- "Programming Language :: Python :: 2.7",
- "Topic :: Software Development :: Libraries :: Python Modules",
+ classifiers=[
+ 'Development Status :: 4 - Beta',
+ 'Intended Audience :: Developers',
+ 'License :: OSI Approved :: MIT License',
+ 'Operating System :: OS Independent',
+ 'Programming Language :: Python',
+ 'Programming Language :: Python :: 2.5',
+ 'Programming Language :: Python :: 2.6',
+ 'Programming Language :: Python :: 2.7',
+ 'Topic :: Software Development :: Libraries :: Python Modules',
],
)
View
22 slurp
@@ -6,6 +6,8 @@ import os
import sys
import tempfile
+import daemon
+
import slurp
@@ -44,6 +46,8 @@ def main():
opt_parser.add_option(
'--enable-syslog', action='store_true', default=False)
opt_parser.add_option(
+ '--disable-stderrlog', action='store_true', default=False)
+ opt_parser.add_option(
'-d', '--daemonize', action='store_true', default=False)
opt_parser.add_option(
'--disable-locking', action='store_true', default=False)
@@ -55,6 +59,8 @@ def main():
'--pid-file', default=None)
opt_parser.add_option(
'--sink', choices=['print', 'null'], default=None)
+ opt_parser.add_option(
+ '--batch-size', type='int', default=None)
(opts, args) = opt_parser.parse_args()
@@ -66,7 +72,7 @@ def main():
paths = args[1:]
log_level = getattr(logging, opts.log_level.upper())
- configure_logging(log_level, True, opts.enable_syslog)
+ configure_logging(log_level, not opts.disable_stderrlog, opts.enable_syslog)
consumer_paths = [
os.path.abspath(os.path.expanduser(os.path.expandvars(consumer_path)))
@@ -84,13 +90,21 @@ def main():
not opts.disable_locking,
opts.lock_timeout,
not opts.disable_tracking,
- sink)
+ sink,
+ opts.batch_size)
if cmd in ('s', 'seed'):
slurp.seed(paths or sys.stdin, conf)
elif cmd in ('m', 'monitor'):
- slurp.monitor(paths or sys.stdin, conf,
- daemonize=opts.daemonize, pid_file=opts.pid_file)
+ if opts.daemonize:
+ if opts.pid_file:
+ pid_file = lockfile.FileLock(opts.pid_file)
+ else:
+ pid_file = None
+ with daemon.DaemonContext(pidfile=pid_file):
+ slurp.monitor(paths, conf)
+ else:
+ slurp.monitor(paths or sys.stdin, conf)
elif cmd in ('e', 'eat'):
slurp.eat(paths or sys.stdin, conf)
View
45 slurp.py
@@ -5,6 +5,7 @@
from lockfile import FileLock
import logging
import os
+import pprint
import time
import pyinotify
@@ -20,12 +21,14 @@ def __init__(self,
state_path, consumer_paths,
locking, lock_timeout,
tracking,
- event_sink):
+ event_sink,
+ batch_size):
self.state_path = state_path
self.lock_class = FileLock if locking else DummyLock
self.lock_timeout = lock_timeout
self.tracker_class = Tracker if tracking else DummyTracker
self.event_sink = event_sink
+ self.batch_size = batch_size
self.consumers = self._load_consumers(consumer_paths)
def _load_consumers(self, paths):
@@ -55,6 +58,8 @@ def _import_consumers(self, file_path):
return imp.load_source('', file_path).CONSUMERS
def _create_consumer(self, **kwargs):
+ logger.debug('creating consumer %s:\n%s',
+ kwargs['name'], pprint.pformat(kwargs))
patterns = kwargs.pop('patterns')
block_terminal = kwargs.pop('block_terminal', '\n')
block_preamble = kwargs.pop('block_preamble', None)
@@ -68,11 +73,17 @@ def _create_consumer(self, **kwargs):
kwargs['block_parser'] = block_parser
file_path = os.path.join(self.state_path, kwargs['name'] + '.track')
kwargs['tracker'] = self.tracker_class(file_path)
- file_path = os.path.join(self.state_path, kwargs['name'] + '.lock')
+ file_path = os.path.join(self.state_path, kwargs['name'])
kwargs['lock'] = self.lock_class(file_path)
if self.event_sink:
+ logger.debug('overriding consumer %s event sink to %s',
+ kwargs['name'], self.event_sink.__name__)
kwargs['event_sink'] = self.event_sink
kwargs['lock_timeout'] = self.lock_timeout
+ if self.batch_size is not None:
+ logger.debug('overriding consumer %s batch size to %s',
+ kwargs['name'], self.batch_size)
+ kwargs['batch_size'] = self.batch_size
return patterns, Consumer(**kwargs)
def get_matching_consumers(self, file_path):
@@ -105,7 +116,9 @@ def __init__(self,
self.batch_size = batch_size
def seed(self, file_path):
- if not self.tracker.has(file_path):
+ if self.tracker.has(file_path):
+ logger.debug('%s already being tracked', file_path)
+ else:
if self.backfill:
offset = 0
else:
@@ -118,7 +131,11 @@ def seed(self, file_path):
self.tracker.save()
def eat(self, file_path):
+ logger.debug('locking %s with timeout %s',
+ self.lock.lock_file, self.lock_timeout)
self.lock.acquire(self.lock_timeout)
+ logger.debug('locked %s with timeout %s',
+ self.lock.lock_file, self.lock_timeout)
try:
if not self.tracker.has(file_path):
if self.backfill:
@@ -142,6 +159,11 @@ def eat(self, file_path):
num_events += 1
event = self.event_parser(
file_path, offet_b, offset_e, raw)
+ if event is None:
+ logger.warning(
+ 'consumer %s parser returned nothing for:\n%s',
+ self.name, raw)
+ continue
events.append(event)
if not self.batch_size:
self.event_sink(events[0])
@@ -164,6 +186,7 @@ def eat(self, file_path):
finally:
self.tracker.save()
finally:
+ logger.debug('unlocking %s', self.lock.lock_file)
self.lock.release()
def track(self, file_path):
@@ -258,7 +281,7 @@ def remove_dir(self, dir_path):
class DummyLock(object):
def __init__(self, file_path):
- pass
+ self.lock_file = file_path
def acquire(self, timeout=None):
pass
@@ -394,15 +417,6 @@ def _parse(self, eof):
class EventParser(object):
def __call__(self, src_file, offset_b, offset_e, raw):
- """
- src_file
- offset_b
- offset_e
- {field-1}
- {field-2}
- ..
- {field-n}
- """
raise NotImplementedError()
@@ -471,8 +485,7 @@ def process_default(self, event):
consumer.untrack_dir(event.pathname)
-def monitor(paths, conf, daemonize=False, pid_file=None):
- # TODO: move daemonize stuff to slurp script (use python-daemon)
+def monitor(paths, conf):
mask = pyinotify.ALL_EVENTS
wm = pyinotify.WatchManager()
for path in paths:
@@ -483,7 +496,7 @@ def monitor(paths, conf, daemonize=False, pid_file=None):
eat([path], conf) # TODO: allow disable?
notifier = pyinotify.Notifier(wm, default_proc_fun=MonitorEvent(conf))
logger.debug('enter notification loop')
- notifier.loop(daemonize=daemonize, pid_file=pid_file)
+ notifier.loop()
logger.debug('exit notification loop')
Please sign in to comment.
Something went wrong with that request. Please try again.