Permalink
Browse files

initial commit for syslogd frontend

  • Loading branch information...
gleicon committed Dec 19, 2010
1 parent 5e52772 commit 0635f3792aec75e6f439a0e582773721d0736685
Showing with 145 additions and 2 deletions.
  1. +1 −1 README.RestMQ
  2. +18 −0 README.syslogd
  3. +1 −1 restmq/core.py
  4. +60 −0 restmq/syslogd.py
  5. +32 −0 syslogd_restmq_server
  6. +33 −0 twisted/plugins/syslogd_plugin.py
View
@@ -27,7 +27,7 @@ There are a few http routes:
/queue - entry point for the JSON protocol (see PROTOCOL). Point your browser to this location for some examples
/c/queuename - entry point for a experimental COMET light consumer.
/p/queuename - entry point for queue policy management - see README.qp for details
- /stats[queuename] - misc statuses and queue list - if queuename is given, shows len and status (paused|started)
+ /stats/[queuename] - misc statuses and queue list - if queuename is given, shows len and status (paused|started)
/j/queuename - basic job checking, for those implementing job schedulers. Point your browser here to check data about a queue
/control/queuename - start/stop consumers (as in pausing them)
View
@@ -0,0 +1,18 @@
+RestMQ based syslogd daemon - Experimental protocol adapter for receiveing high volume syslog messages
+
+- TCP only
+- each posting host has a queue named syslogd:<ip>
+- run it in a standalone restmq, for N servers
+- consume your results of a restmq instance bound to the same redis database
+- default port 25000
+- performs lookup of severity and facility
+
+
+Use syslogd_restmq_server to run the frontend.
+
+Bind on /c/syslogd:<hostname> for receiving thru the comet endpoint:
+
+- curl http://localhost:8888/c/syslogd:127.0.0.1
+
+
+Gleicon - 2010
View
@@ -242,7 +242,7 @@ def queue_tail(self, queue, keyno=10, delete_obj=False):
else:
v = yield self.redis.get(t)
- multivalue.append({'key': t, 'value':v.encode('utf-8')})
+ multivalue.append({'key': okey, 'value':v.encode('utf-8')})
qpkey = "%s:queuepolicy" % queue
policy = yield self.redis.get(qpkey)
View
@@ -0,0 +1,60 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+
+# launchctl unload /System/Library/LaunchDaemons/com.apple.syslogd.plist
+# launchctl load /System/Library/LaunchDaemons/com.apple.syslogd.plist
+
+from twisted.internet import reactor, defer
+from twisted.internet.protocol import Protocol, ServerFactory
+from twisted.protocols.basic import LineReceiver
+import time, re, math, json, os
+from restmq import core
+import cyclone.redis
+
+
+#<22>Nov 1 00:12:04 gleicon-vm1 postfix/smtpd[4880]: connect from localhost[127.0.0.1]
+severity = ['emerg', 'alert', 'crit', 'err', 'warn', 'notice', 'info', 'debug', ]
+
+facility = ['kern', 'user', 'mail', 'daemon', 'auth', 'syslog', 'lpr', 'news',
+ 'uucp', 'cron', 'authpriv', 'ftp', 'ntp', 'audit', 'alert', 'at', 'local0',
+ 'local1', 'local2', 'local3', 'local4', 'local5', 'local6', 'local7',]
+
+fs_match = re.compile("<(.+)>(.*)", re.I)
+
+class SyslogdProtocol(LineReceiver):
+ delimiter = '\n'
+ def connectionMade(self):
+ print 'Connection from %r' % self.transport
+
+ def lineReceived(self, line):
+ host = self.transport.getHost().host
+ queue_name = "syslogd:%s" % host
+ k = {}
+ k['line'] = line.strip()
+ (fac, sev) = self._calc_lvl(k['line'])
+ k['host'] = host
+ k['tstamp'] = time.time()
+ k['facility'] = fac
+ k['severity'] = sev
+ self.factory.oper.queue_add(queue_name, json.dumps(k))
+
+ def _calc_lvl(self, line):
+ lvl = fs_match.split(line)
+ if lvl and len(lvl) > 1:
+ i = int(lvl[1])
+ fac = int(math.floor(i / 8))
+ sev = i - (fac * 8)
+ return (facility[fac], severity[sev])
+ return (None, None)
+
+class SyslogdFactory(ServerFactory):
+ protocol = SyslogdProtocol
+
+ def __init__ (self, redis_host, redis_port, redis_pool, redis_db ):
+
+ db = cyclone.redis.lazyRedisConnectionPool(
+ redis_host, redis_port,
+ pool_size=redis_pool, db=redis_db)
+
+ self.oper = core.RedisOperations(db)
+
View
@@ -0,0 +1,32 @@
+#!/bin/bash
+# RestMQ
+
+# process' user and group
+USER=www-data
+GROUP=www-data
+
+# twisted reactor (select, poll, epoll, kqueue)
+REACTOR=epoll
+
+# pidfile and logfile
+PIDFILE=syslogd_restmq.pid
+LOGFILE=syslogd_restmq.log
+
+# redis host, port and connection pool size
+REDIS_HOST=127.0.0.1
+REDIS_PORT=6379
+REDIS_POOL=10
+
+# server port, and interface to listen on
+PORT=25000
+#LISTEN=127.0.0.1
+LISTEN=10.0.1.4
+
+# run in foreground, for testing
+exec twistd --pidfile=$PIDFILE --logfile=$LOGFILE -n syslogd $* --listen=$LISTEN
+
+# run in background, for production
+# export PYTHONPATH=`dirname $0`:$PYTHONPATH
+# exec twistd --pidfile=$PIDFILE --logfile=$LOGFILE --reactor=$REACTOR --uid=$USER --gid=$GROUP \
+# restmq --redis-host=$REDIS_HOST --redis-port=$REDIS_PORT --redis-pool=$REDIS_POOL \
+# --port=$PORT --listen=$LISTEN
@@ -0,0 +1,33 @@
+#!/usr/bin/env python
+# coding: utf-8
+
+from zope.interface import implements
+from twisted.python import usage
+from twisted.plugin import IPlugin
+from twisted.application import service, internet
+
+import restmq.syslogd
+
+class Options(usage.Options):
+ optParameters = [
+ ["redis-host", "", "127.0.0.1", "hostname or ip address of the redis server"],
+ ["redis-port", "", 6379, "port number of the redis server", int],
+ ["redis-pool", "", 10, "connection pool size", int],
+ ["redis-db", "", 0, "redis database", int],
+ ["port", "", 25000, "port number to listen on", int],
+ ["listen", "", "127.0.0.1", "interface to listen on"],
+ ]
+
+class ServiceMaker(object):
+ implements(service.IServiceMaker, IPlugin)
+ tapname = "syslogd"
+ description = "Syslogd RESTful Message Broker"
+ options = Options
+
+ def makeService(self, options):
+ return internet.TCPServer(options["port"],
+ restmq.syslogd.SyslogdFactory(options["redis-host"], options["redis-port"],
+ options["redis-pool"], options["redis-db"]),
+ interface=options["listen"])
+
+serviceMaker = ServiceMaker()

0 comments on commit 0635f37

Please sign in to comment.