Browse files

initial

  • Loading branch information...
0 parents commit 25ee89ac43872e4e0d25902bd155827e35d382e7 Andrew committed Jan 12, 2012
Showing with 790 additions and 0 deletions.
  1. +4 −0 .gitignore
  2. +19 −0 COPYING
  3. +5 −0 MANIFEST.in
  4. +21 −0 README.md
  5. +119 −0 contrib/slurp
  6. +38 −0 setup.py
  7. +93 −0 slurp
  8. +491 −0 slurp.py
4 .gitignore
@@ -0,0 +1,4 @@
+.project
+.pydevproject
+pkg
+slurp.egg-info/
19 COPYING
@@ -0,0 +1,19 @@
+Copyright (c) 2012 Noone Nowhere <noone@nowhere.org>
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
5 MANIFEST.in
@@ -0,0 +1,5 @@
+include COPYING
+include MANIFEST.in
+include README.md
+include *.py
+include slurp
21 README.md
@@ -0,0 +1,21 @@
+# Pyinotify
+
+* License : MIT
+* Project URL : [https://github.com/bninja/slurp](https://github.com/bninja/slurp)
+
+
+## Dependencies
+
+* pyinotify >= 0.9.3
+* Python >= 2.5
+
+
+## Install
+
+## Usage
+
+### Seed
+
+### Eat
+
+### Monitor
119 contrib/slurp
@@ -0,0 +1,119 @@
+#!/bin/sh
+#
+# slurp - this script starts and stops the slurp daemon
+#
+# chkconfig: - 85 15
+# description: slurp
+# processname: slurp
+# config: /etc/sysconfig/slurp
+# pidfile: /var/run/slurp.pid
+# lockfile: /var/lock/subsys/slurp
+
+# source function library.
+. /etc/rc.d/init.d/functions
+
+# source networking configuration.
+. /etc/sysconfig/network
+
+# check that networking is up.
+[ "$NETWORKING" = "no" ] && exit 0
+
+NAME=slurp
+USER=root
+PROG="slurp"
+PID_FILE="/var/run/slurp.pid"
+LOCK_FILE="/var/lock/subsys/slurp"
+CONFIG="/etc/sysconfig/slurp"
+PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin
+
+MONITOR_PATHS="/var/log/messages"
+STATE_PATH="/var/lib/slurp"
+CONSUMERS="/etc/slurp.d"
+LOG_LEVEL="info"
+
+if [ -f $CONFIG ]; then
+ . $CONFIG
+fi
+
+OPTS="monitor \
+$MONITOR_PATHS \
+--state-path=$STATE_PATH \
+--consumer=$CONSUMERS \
+--log-level=$LOG_LEVEL \
+--daemonize --pid-file=$PID_FILE \
+--enable-syslog"
+
+start() {
+ [ -x $PROG ] || exit 5
+ echo -n $"Starting $NAME: "
+ if [ "$(id -u)" != "0" ]; then
+daemon --pidfile $PIDFILE $PROG $OPTS
+ else
+daemon --pidfile $PIDFILE --user $USER $PROG $OPTS
+ fi
+retval=$?
+ echo
+ [ $retval -eq 0 ] && touch $LOCKFILE
+ return $retval
+}
+
+stop() {
+ echo -n $"Stopping $NAME: "
+ killproc -p $PIDFILE $NAME
+ retval=$?
+ echo
+ [ $retval -eq 0 ] && rm -f $LOCKFILE
+ return $retval
+}
+
+restart() {
+ stop
+ start
+}
+
+reload() {
+ echo -n $"Reloading $NAME: "
+ killproc -p $PIDFILE $NAME -HUP
+ echo
+}
+
+force_reload() {
+ restart
+}
+
+rh_status() {
+ status -p $PIDFILE $NAME
+}
+
+rh_status_q() {
+ rh_status >/dev/null 2>&1
+}
+
+case "$1" in
+ start)
+ rh_status_q && exit 0
+ $1
+ ;;
+ stop)
+ rh_status_q || exit 0
+ $1
+ ;;
+ restart)
+ $1
+ ;;
+ reload)
+ $1
+ ;;
+ force-reload)
+ force_reload
+ ;;
+ status)
+ rh_status
+ ;;
+ cond-restart|try-restart)
+ rh_status_q || exit 0
+ ;;
+ *)
+ echo $"Usage: $0 {start|stop|status|restart|cond-restart|try-restart|force-reload}"
+ exit 2
+esac
38 setup.py
@@ -0,0 +1,38 @@
+from setuptools import setup
+
+
+__version__ = '0.0.1'
+
+
+setup(
+ name = "slurp",
+ version = __version__,
+ description = "Log file slurper",
+ author = "noone",
+ author_email = "noone@nowhere.com",
+ url = "https://github.com/bninja/slurp",
+ keywords = [
+ "slurp",
+ ],
+ install_requires = [
+ "pyinotify==0.9.3",
+ "lockfile==0.9.1",
+ ],
+ py_modules=[
+ 'slurp',
+ ],
+ scripts=[
+ 'slurp',
+ ],
+ classifiers = [
+ "Development Status :: 4 - Beta",
+ "Intended Audience :: Developers",
+ "License :: OSI Approved :: MIT License",
+ "Operating System :: OS Independent",
+ "Programming Language :: Python",
+ "Programming Language :: Python :: 2.5",
+ "Programming Language :: Python :: 2.6",
+ "Programming Language :: Python :: 2.7",
+ "Topic :: Software Development :: Libraries :: Python Modules",
+ ],
+ )
93 slurp
@@ -0,0 +1,93 @@
+#!/usr/bin/env python
+import logging
+import logging.handlers
+from optparse import OptionParser
+import os
+import sys
+import tempfile
+
+import slurp
+
+
+logger = logging.getLogger('slurp')
+
+
+def configure_logging(level, enable_stderr, enable_syslog):
+ logger = logging.getLogger('')
+ logger.setLevel(level)
+ if enable_stderr:
+ handler = logging.StreamHandler(sys.stderr)
+ fmt = logging.Formatter(
+ '%(asctime)s : %(levelname)s : %(name)s : %(message)s')
+ handler.setFormatter(fmt)
+ logger.addHandler(handler)
+ if enable_syslog:
+ handler = logging.handlers.SysLogHandler('/dev/log')
+ fmt = logging.Formatter('slurp[%(process)d]: %(message)s')
+ handler.setFormatter(fmt)
+ logger.addHandler(handler)
+
+
+def main():
+ opt_parser = OptionParser(usage="""
+%prog s|seed path-1 .. path-n [options]
+%prog m|monitor path-1 .. path-n [options]
+%prog e|eat path-1 .. path-n [options]\
+""")
+ opt_parser.add_option(
+ '-s', '--state-path', default=tempfile.gettempdir())
+ opt_parser.add_option(
+ '-c', '--consumer', dest='consumers', action='append', default=[])
+ opt_parser.add_option(
+ '-l', '--log-level', choices=['debug', 'info', 'warn', 'error'],
+ default='warn')
+ opt_parser.add_option(
+ '--enable-syslog', action='store_true', default=False)
+ opt_parser.add_option(
+ '-d', '--daemonize', action='store_true', default=False)
+ opt_parser.add_option(
+ '--disable-locking', action='store_true', default=False)
+ opt_parser.add_option(
+ '--lock-timeout', type='int', default=None)
+ opt_parser.add_option(
+ '--disable-tracking', action='store_true', default=False)
+ opt_parser.add_option(
+ '--pid-file', default=None)
+ opt_parser.add_option(
+ '--print-sink', action='store_true', default=False)
+
+ (opts, args) = opt_parser.parse_args()
+
+ if not args:
+ raise Exception(opt_parser.get_usage())
+ cmd = args[0]
+ if cmd not in ('s', 'seed', 'm', 'monitor', 'e', 'eat'):
+ raise Exception(opt_parser.get_usage())
+ paths = args[1:]
+
+ log_level = getattr(logging, opts.log_level.upper())
+ configure_logging(log_level, True, opts.enable_syslog)
+
+ consumer_paths = [
+ os.path.abspath(os.path.expanduser(os.path.expandvars(consumer_path)))
+ for consumer_path in opts.consumers]
+
+ conf = slurp.Conf(
+ opts.state_path,
+ consumer_paths,
+ not opts.disable_locking,
+ opts.lock_timeout,
+ not opts.disable_tracking,
+ print_sink if opts.print_sink else None)
+
+ if cmd in ('s', 'seed'):
+ slurp.seed(paths or sys.stdin, conf)
+ elif cmd in ('m', 'monitor'):
+ slurp.monitor(paths or sys.stdin, conf,
+ daemonize=opts.daemonize, pid_file=opts.pid_file)
+ elif cmd in ('e', 'eat'):
+ slurp.eat(paths or sys.stdin, conf)
+
+
+if __name__ == '__main__':
+ main()
491 slurp.py
@@ -0,0 +1,491 @@
+import functools
+import glob
+import imp
+import json
+from lockfile import FileLock
+import logging
+import os
+import time
+
+import pyinotify
+
+
+__version__ = '0.0.1'
+
+logger = logging.getLogger(__name__)
+
+
+class Conf(object):
+ def __init__(self,
+ state_path, consumer_paths,
+ locking, lock_timeout,
+ tracking,
+ event_sink):
+ self.state_path = state_path
+ self.lock_class = FileLock if locking else DummyLock
+ self.lock_timeout = lock_timeout
+ self.tracker_class = Tracker if tracking else DummyTracker
+ self.event_sink = event_sink
+ self.consumers = self._load_consumers(consumer_paths)
+
+ def _load_consumers(self, paths):
+ consumers = []
+ consumer_names = set()
+ for path in paths:
+ file_paths = []
+ if os.path.isfile(path):
+ file_paths.append(path)
+ else:
+ if os.path.isdir(path):
+ path = os.path.join(path, '*.py')
+ file_paths += [
+ p for p in glob.glob(path)]
+ for file_path in file_paths:
+ for conf in self._import_consumers(file_path):
+ patterns, consumer = self._create_consumer(**conf)
+ if consumer.name in consumer_names:
+ raise ValueError('consumer %s from %s conflict' %
+ (consumer.name, file_path))
+ consumers.append((patterns, consumer))
+ consumer_names.add(consumer.name)
+ return consumers
+
+ def _import_consumers(self, file_path):
+ logger.debug('loading consumers from %s', file_path)
+ return imp.load_source('', file_path).CONSUMERS
+
+ def _create_consumer(self, **kwargs):
+ patterns = kwargs.pop('patterns')
+ block_terminal = kwargs.pop('block_terminal', '\n')
+ block_preamble = kwargs.pop('block_preamble', None)
+ if block_preamble:
+ block_parser = functools.partial(MultiLineIterator,
+ preamble=block_preamble,
+ terminal=block_terminal)
+ else:
+ block_parser = functools.partial(LineIterator,
+ terminal=block_terminal)
+ kwargs['block_parser'] = block_parser
+ file_path = os.path.join(self.state_path, kwargs['name'] + '.track')
+ kwargs['tracker'] = self.tracker_class(file_path)
+ file_path = os.path.join(self.state_path, kwargs['name'] + '.lock')
+ kwargs['lock'] = self.lock_class(file_path)
+ if self.event_sink:
+ kwargs['event_sink'] = self.event_sink
+ kwargs['lock_timeout'] = self.lock_timeout
+ return patterns, Consumer(**kwargs)
+
+ def get_matching_consumers(self, file_path):
+ consumers = []
+ for patterns, consumer in self.consumers:
+ for pattern in patterns:
+ if pattern.match(file_path):
+ logger.debug('%s matched consumer %s pattern %s',
+ file_path, consumer.name, pattern.pattern)
+ consumers.append(consumer)
+ return consumers
+
+
+class Consumer(object):
+ def __init__(self,
+ name,
+ block_parser, event_parser, event_sink,
+ tracker,
+ lock, lock_timeout=None,
+ backfill=True,
+ batch_size=None):
+ self.name = name
+ self.block_parser = block_parser
+ self.event_parser = event_parser
+ self.event_sink = event_sink
+ self.tracker = tracker
+ self.lock = lock
+ self.lock_timeout = lock_timeout
+ self.backfill = backfill
+ self.batch_size = batch_size
+
+ def seed(self, file_path):
+ if not self.tracker.has(file_path):
+ if self.backfill:
+ offset = 0
+ else:
+ with open(file_path, 'r') as fo:
+ fo.seek(0, os.SEEK_END)
+ offset = fo.tell()
+ logger.debug('%s seeding %s with offset %s',
+ self.name, file_path, offset)
+ self.tracker.add(file_path, offset)
+
+ def eat(self, file_path):
+ self.lock.acquire(self.lock_timeout)
+ try:
+ if not self.tracker.has(file_path):
+ if self.backfill:
+ offset = 0
+ else:
+ with open(file_path, 'r') as fo:
+ fo.seek(0, os.SEEK_END)
+ offset = fo.tell()
+ self.tracker.add(file_path, offset)
+ offset = self.tracker.get(file_path)
+ try:
+ st = time.time()
+ bytes = 0
+ num_events = 0
+ with open(file_path, 'r') as fo:
+ fo.seek(offset, os.SEEK_SET)
+ events = []
+ offset_e = offset
+ for raw, offet_b, offset_e in self.block_parser(fo):
+ bytes += len(raw)
+ num_events += 1
+ event = self.event_parser(
+ file_path, offet_b, offset_e, raw)
+ events.append(event)
+ if not self.batch_size:
+ self.event_sink(events[0])
+ self.tracker.update(file_path, offset_e)
+ del events[:]
+ elif len(events) >= self.batch_size:
+ logger.info('%s eating %s events',
+ self.name, len(events))
+ self.event_sink(events)
+ self.tracker.update(file_path, offset_e)
+ del events[:]
+ if events:
+ self.event_sink(events)
+ self.tracker.update(file_path, offset_e)
+ del events[:]
+ et = time.time()
+ logger.info(
+ '%s ate %s events (%s bytes) from %s in %0.4f sec(s)',
+ self.name, num_events, bytes, file_path, et - st)
+ finally:
+ self.tracker.save()
+ finally:
+ self.lock.release()
+
+ def track(self, file_path):
+ self.tracker.add(file_path)
+
+ def untrack(self, file_path):
+ self.tracker.remove(file_path)
+
+ def untrack_dir(self, dir_path):
+ self.tracker.remove_dir(dir_path)
+
+
+class DummyTracker(object):
+ def __init__(self, file_path):
+ self.file_path = file_path
+ self.load()
+
+ def load(self):
+ pass
+
+ def save(self):
+ pass
+
+ def add(self, file_path, offset=0):
+ pass
+
+ def update(self, file_path, offset):
+ pass
+
+ def get(self, file_path):
+ return 0
+
+ def has(self, file_path):
+ return False
+
+ def remove(self, file_path):
+ pass
+
+ def remove_dir(self, dir_path):
+ pass
+
+
+class Tracker(object):
+ def __init__(self, file_path):
+ self.file_path = file_path
+ self.load()
+
+ def load(self):
+ if os.path.isfile(self.file_path):
+ logger.debug('loading tracking data from %s', self.file_path)
+ with open(self.file_path, 'r') as fo:
+ raw = fo.read()
+ self.file_offsets = json.loads(raw)
+ else:
+ logger.debug('not tracking data %s', self.file_path)
+ self.file_offsets = {}
+
+ def save(self):
+ logger.debug('saving tracking data to %s', self.file_path)
+ with open(self.file_path, 'w') as fo:
+ fo.write(json.dumps(self.file_offsets))
+
+ def add(self, file_path, offset=0):
+ logger.info('tracking add %s offset %s', file_path, offset)
+ if file_path in self.file_offsets:
+ raise ValueError('%s already tracked' % file_path)
+ self.file_offsets[file_path] = offset
+
+ def update(self, file_path, offset):
+ logger.debug('tracking update %s to %s', file_path, offset)
+ self.file_offsets[file_path] = offset
+
+ def get(self, file_path):
+ return self.file_offsets[file_path]
+
+ def has(self, file_path):
+ return file_path in self.file_offsets
+
+ def remove(self, file_path):
+ logger.info('tracking remove %s', file_path)
+ del self.file_offsets[file_path]
+
+ def remove_dir(self, dir_path):
+ logger.info('tracking remove dir %s', dir_path)
+ for file_path in self.file_offsets.keys():
+ if file_path.startswith(dir_path):
+ self.remove(file_path)
+
+
+class DummyLock(object):
+ def __init__(self, file_path):
+ pass
+
+ def acquire(self, timeout=None):
+ pass
+
+ def release(self):
+ pass
+
+
+def get_files(path):
+ if os.path.isfile(path):
+ yield path
+ else:
+ for dir_name, dir_names, file_names in os.walk(path):
+ for file_name in file_names:
+ file_path = os.path.join(dir_name, file_name)
+ yield file_path
+
+
+class BlockIterator(object):
+ def __init__(self, fo, strict=False, read_size=2048):
+ self.fo = fo
+ self.pos = fo.tell()
+ self.strict = strict
+ self.read_size = read_size
+ self.buffer = ''
+ self.eof = False
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ if self.buffer:
+ result = self._parse(self.eof)
+ if result:
+ return result
+ while not self.eof:
+ buffer = self.fo.read(self.read_size)
+ self.eof = (len(buffer) != self.read_size)
+ self.buffer += buffer
+ result = self._parse(self.eof)
+ if not result:
+ continue
+ raw, offet_b, offset_e = result
+ return raw, offet_b, offset_e
+ if self.buffer:
+ if self.strict:
+ raise ValueError('%s[%s:] is partial block',
+ self.fo.name, self.pos)
+ else:
+ logger.warning('%s[%s:] is partial block, discarding',
+ self.fo.name, self.pos)
+ raise StopIteration()
+
+
+class LineIterator(BlockIterator):
+ def __init__(self, fo, terminal, **kwargs):
+ super(LineIterator, self).__init__(fo, **kwargs)
+ self.terminal = terminal
+
+ def _parse(self, eof):
+ index = self.buffer.find(self.terminal)
+ if index == -1:
+ return None
+ index += len(self.terminal)
+ result = self.buffer[:index], self.pos, self.pos + index
+ self.buffer = self.buffer[index:]
+ self.pos += index
+ return result
+
+
+class MultiLineIterator(BlockIterator):
+ def __init__(self, fo, preamble, terminal, **kwargs):
+ super(MultiLineIterator, self).__init__(fo, **kwargs)
+ self.preamble = preamble
+ self.terminal = terminal
+
+ def _parse(self, eof):
+ match = self.preamble.search(self.buffer)
+ if not match:
+ logger.debug('%s[%s:%s] has no preamble', self.fo.name,
+ self.pos, self.pos + len(self.buffer))
+ return None
+ if match.start() != 0:
+ if self.strict:
+ raise ValueError('%s[%s:%s] is partial block',
+ self.fo.name, self.pos, self.pos + match.start())
+ logger.warning('%s[%s:%s] is partial block, discarding',
+ self.fo.name, self.pos, self.pos + match.start())
+ self.buffer = self.buffer[match.start():]
+ self.pos += match.start()
+ logger.debug('%s[%s:] has preamble', self.fo.name, self.pos)
+ next = match
+ while True:
+ prev = next
+ next = self.preamble.search(self.buffer, prev.end())
+ if not next:
+ logger.debug('%s[%s:] contains no preamble',
+ self.fo.name, self.pos + prev.end())
+ break
+ prefix = self.buffer[
+ next.start() - len(self.terminal):next.start()]
+ if prefix == self.terminal:
+ logger.debug('%s[%s:] contains terminal-prefixed preamble',
+ self.fo.name, self.pos + next.end())
+ break
+ logger.debug('%s[%s:] contains non-terminal-prefixed preamble',
+ self.fo.name, self.pos + next.end())
+ if next:
+ logger.debug('%s[%s:%s] hit', self.fo.name, self.pos,
+ self.pos + next.start())
+ raw = self.buffer[:next.start()]
+ self.buffer = self.buffer[next.start():]
+ else:
+ if not eof:
+ return None
+ suffix = self.buffer[-len(self.terminal):]
+ if suffix != self.terminal:
+ if self.strict:
+ raise ValueError('%s[%s:%s] is partial block',
+ self.fo.name, self.pos, self.pos + len(self.buffer))
+ logger.warning('%s[%s:%s] is partial block, discarding',
+ self.fo.name, self.pos, self.pos + len(self.buffer))
+ self.pos += len(self.buffer)
+ self.buffer = ''
+ return None
+ logger.debug('%s[%s:] hit', self.fo.name, self.pos)
+ raw = self.buffer
+ self.buffer = ''
+ result = raw, self.pos, self.pos + len(raw)
+ self.pos += len(raw)
+ return result
+
+
+class EventParser(object):
+ def __call__(self, src_file, offset_b, offset_e, raw):
+ """
+ src_file
+ offset_b
+ offset_e
+ tag
+ host
+ severity
+ timestamp
+ payload
+ """
+ raise NotImplementedError()
+
+
+class EventSink(object):
+ def __call__(self, event):
+ raise NotImplementedError()
+
+
+def print_sink(event):
+ if not isinstance(event, list):
+ event = [event]
+ for e in event:
+ print e
+
+
+def seed(paths, conf):
+ for path in paths:
+ logger.debug('scanning %s', path)
+ path = path.strip()
+ for file_path in get_files(path):
+ consumers = conf.get_matching_consumers(file_path)
+ for consumer in consumers:
+ consumer.seed(file_path)
+
+
+class MonitorEvent(pyinotify.ProcessEvent):
+ def __init__(self, conf):
+ self.conf = conf
+ self.cached_matches = {}
+
+ def process_default(self, event):
+ logger.debug('processing event %s', event)
+
+ # matching consumers
+ if event.pathname not in self.cached_matches:
+ consumers = self.conf.get_matching_consumers(event.pathname)
+ self.cached_matches[event.pathname] = consumers
+ consumers = self.cached_matches[event.pathname]
+
+ # file created
+ if (event.mask & pyinotify.IN_CREATE and
+ not event.mask & pyinotify.IN_ISDIR):
+ for consumer in consumers:
+ consumer.track(event.pathname)
+
+ # file modified
+ if (event.mask & pyinotify.IN_MODIFY and
+ not event.mask & pyinotify.IN_ISDIR):
+ for consumer in consumers:
+ consumer.eat(event.pathname)
+
+ # file deleted
+ if (event.mask & pyinotify.IN_DELETE and
+ not event.mask & pyinotify.IN_ISDIR):
+ for consumer in consumers:
+ consumer.untrack(event.pathname)
+
+ # directory deleted
+ if (event.mask & pyinotify.IN_DELETE and
+ event.mask & pyinotify.IN_ISDIR):
+ for consumer in consumers:
+ consumer.untrack_dir(event.pathname)
+
+
+def monitor(paths, conf, daemonize=False, pid_file=None):
+ mask = pyinotify.ALL_EVENTS
+ wm = pyinotify.WatchManager()
+ for path in paths:
+ path = path.strip()
+ logger.info('monitoring %s', path)
+ wm.add_watch(path, mask, rec=True, auto_add=True)
+ notifier = pyinotify.Notifier(wm, default_proc_fun=MonitorEvent(conf))
+ logger.info('enter notification loop')
+ notifier.loop(daemonize=daemonize, pid_file=pid_file)
+ logger.info('exit notification loop')
+
+
+def eat(paths, conf):
+ for path in paths:
+ path = path.strip()
+ for file_path in get_files(path):
+ num_consumed = 0
+ consumers = conf.get_matching_consumers(file_path)
+ for consumer in consumers:
+ logger.debug('%s eating file %s', consumer.name, file_path)
+ consumer.eat(file_path)
+ num_consumed += 1
+ if not num_consumed:
+ logger.info('no consumers for file %s', file_path)

0 comments on commit 25ee89a

Please sign in to comment.