Skip to content
Browse files

collectd queues and twisted application

  • Loading branch information...
1 parent 8852c32 commit 517a08d600c1f67954be26fbc3e94baf66b9c23e @rgaiser rgaiser committed
Showing with 199 additions and 0 deletions.
  1. +31 −0 collectd_restmq_server
  2. +29 −0 examples/test_collectd.py
  3. +104 −0 restmq/collectd.py
  4. +35 −0 twisted/plugins/collectd_plugin.py
View
31 collectd_restmq_server
@@ -0,0 +1,31 @@
+#!/bin/bash
+# RestMQ
+
+# process' user and group
+USER=www-data
+GROUP=www-data
+
+# twisted reactor (select, poll, epoll, kqueue)
+REACTOR=epoll
+
+# pidfile and logfile
+PIDFILE=/var/run/restmq.pid
+LOGFILE=/var/log/restmq.log
+
+# redis host, port and connection pool size
+REDIS_HOST=127.0.0.1
+REDIS_PORT=6379
+REDIS_POOL=10
+
+# server port, and interface to listen on
+PORT=8888
+LISTEN=127.0.0.1
+
+# run in foreground, for testing
+exec twistd -n collectd $*
+
+# run in background, for production
+# export PYTHONPATH=`dirname $0`:$PYTHONPATH
+# exec twistd --pidfile=$PIDFILE --logfile=$LOGFILE --reactor=$REACTOR --uid=$USER --gid=$GROUP \
+# restmq --redis-host=$REDIS_HOST --redis-port=$REDIS_PORT --redis-pool=$REDIS_POOL \
+# --port=$PORT --listen=$LISTEN
View
29 examples/test_collectd.py
@@ -0,0 +1,29 @@
+#!/usr/bin/env python
+# coding: utf-8
+
+import sys
+import cjson
+from twisted.web import client
+from twisted.python import log
+from twisted.internet import reactor
+
+class CometClient(object):
+ def write(self, content):
+ try:
+ data = cjson.decode(content)
+ data = data.get('value')
+ except Exception, e:
+ log.err("cannot decode json: %s" % str(e))
+ log.err("json is: %s" % content)
+ else:
+ log.msg("got data: %s" % repr(data))
+
+ def close(self):
+ pass
+
+if __name__ == "__main__":
+ log.startLogging(sys.stdout)
+ client.downloadPage("http://localhost:8888/c/collectd_data", CometClient())
+ client.downloadPage("http://localhost:8888/c/collectd_event", CometClient())
+# client.downloadPage("http://shrek.intranet/c/collectd_data", CometClient())
+ reactor.run()
View
104 restmq/collectd.py
@@ -0,0 +1,104 @@
+# coding: utf-8
+
+import types
+import base64
+import hashlib
+import os.path
+import functools
+import cyclone.web
+import cyclone.redis
+import cyclone.escape
+from collections import defaultdict
+from ConfigParser import ConfigParser
+
+from twisted.python import log
+from twisted.internet import task, defer, reactor
+
+from restmq import core
+from restmq import dispatch
+
+import simplejson
+import web
+
+class CollectdRestQueueHandler(web.RestQueueHandler):
+
+ @web.authorize("rest_producer")
+ @defer.inlineCallbacks
+ def post(self, queue):
+ value = self.request.body
+ if value is None:
+ raise cyclone.web.HTTPError(400)
+ if queue == 'data':
+ queue = 'collectd_data'
+ try:
+ value = value.splitlines()
+ value = list(map((lambda x: x.split(' ')[1:]),value))
+ value = simplejson.dumps(value)
+ except Exception, e:
+ log.msg("ERROR: %s" % e)
+ raise cyclone.web.HTTPError(503)
+ elif queue == 'event':
+ queue = 'collectd_event'
+ try:
+ value = value.splitlines()
+ event = value.pop()
+ value = list(map((lambda x: x.split(': ')),value[:-1]))
+ value.append(['Event',event])
+ value = simplejson.dumps(value)
+ except Exception, e:
+ log.msg("ERROR: %s" % e)
+ raise cyclone.web.HTTPError(503)
+ else:
+ raise cyclone.web.HTTPError(400)
+ callback = self.get_argument("callback", None)
+
+ try:
+ result = yield self.settings.oper.queue_add(queue, value)
+ except Exception, e:
+ log.msg("ERROR: oper.queue_add('%s', '%s') failed: %s" % (queue, value, e))
+ raise cyclone.web.HTTPError(503)
+
+ if result:
+ self.settings.comet.queue.put(queue)
+ web.CustomHandler(self, callback).finish(result)
+ else:
+ raise cyclone.web.HTTPError(400)
+
+class Application(cyclone.web.Application):
+ def __init__(self, acl_file, redis_host, redis_port, redis_pool, redis_db):
+ handlers = [
+ (r"/", web.IndexHandler),
+ (r"/q/(.*)", web.RestQueueHandler),
+ (r"/collectd/(.*)", CollectdRestQueueHandler),
+ (r"/c/(.*)", web.CometQueueHandler),
+ (r"/p/(.*)", web.PolicyQueueHandler),
+ (r"/j/(.*)", web.JobQueueInfoHandler),
+ (r"/stats/(.*)", web.StatusHandler),
+ (r"/queue", web.QueueHandler),
+ (r"/control/(.*)", web.QueueControlHandler),
+ (r"/ws/(.*)", web.WebSocketQueueHandler),
+ ]
+
+ try:
+ acl = web.ACL(acl_file)
+ except Exception, e:
+ log.msg("ERROR: Cannot load ACL file: %s" % e)
+ raise RuntimeError("Cannot load ACL file: %s" % e)
+
+ db = cyclone.redis.lazyRedisConnectionPool(
+ redis_host, redis_port,
+ pool_size=redis_pool, db=redis_db)
+
+ oper = core.RedisOperations(db)
+ cwd = os.path.dirname(__file__)
+
+ settings = {
+ "db": db,
+ "acl": acl,
+ "oper": oper,
+ "comet": web.CometDispatcher(oper),
+ "static_path": os.path.join(cwd, "static"),
+ "template_path": os.path.join(cwd, "templates"),
+ }
+
+ cyclone.web.Application.__init__(self, handlers, **settings)
View
35 twisted/plugins/collectd_plugin.py
@@ -0,0 +1,35 @@
+#!/usr/bin/env python
+# coding: utf-8
+
+from zope.interface import implements
+from twisted.python import usage
+from twisted.plugin import IPlugin
+from twisted.application import service, internet
+
+import restmq.collectd
+
+class Options(usage.Options):
+ optParameters = [
+ ["acl", "", "acl.conf", "acl configuration file for endpoints"],
+ ["redis-host", "", "127.0.0.1", "hostname or ip address of the redis server"],
+ ["redis-port", "", 6379, "port number of the redis server", int],
+ ["redis-pool", "", 10, "connection pool size", int],
+ ["redis-db", "", 0, "redis database", int],
+ ["port", "", 8888, "port number to listen on", int],
+ ["listen", "", "127.0.0.1", "interface to listen on"],
+ ]
+
+class ServiceMaker(object):
+ implements(service.IServiceMaker, IPlugin)
+ tapname = "collectd"
+ description = "Collectd RESTful Message Broker"
+ options = Options
+
+ def makeService(self, options):
+ return internet.TCPServer(options["port"],
+ restmq.collectd.Application(options["acl"],
+ options["redis-host"], options["redis-port"],
+ options["redis-pool"], options["redis-db"]),
+ interface=options["listen"])
+
+serviceMaker = ServiceMaker()

0 comments on commit 517a08d

Please sign in to comment.
Something went wrong with that request. Please try again.