Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #71 from monkeymantra/master

Feature addition - Multiple VHost monitoring
  • Loading branch information...
commit 6d3976606658995e7a1cc6a6f5598ad6ed03faf6 2 parents 6777c54 + 1a6a2d8
@jbuchbinder jbuchbinder authored
View
10 rabbit/README.mkdn
@@ -5,12 +5,18 @@ python module for ganglia 3.1.
"rabbit" sends metrics on RabbitMQ nodes using the stats api. It is based off the very similar ElasticSearch module.
-http://(node-ip):55672/api/queues (or nodes)
+http://(node-ip):55672/api/queues (or nodes, or exchanges)
-This module requires simplejson, or if using a 2.6 interpreter with mod_python, json. Modify accordingly.
+Please see http://hg.rabbitmq.com/rabbitmq-management/raw-file/rabbitmq_v2_7_1/priv/www/api/index.html for more info on the management API. That's a good place to start if you want to extend this module and include new metrics.
+
+This module requires simplejson, or if using a 2.6 interpreter with mod_python, the json module. Modify accordingly.
The digItUp function, and the keyToPath syntax, were borrowed from the ElasticSearch module.
+To use multiple vhosts, separate them by comma in the vhosts file.
+
+To get metrics besides nodes or queues, either check out how the buildQueueDescriptors and buildNodeDescriptors were set up and make a new descriptor builder/modify stats at the top of the python file and contribute the changes, or ask for my assistance and I'll see what I can do.
+
## AUTHORS
Gregory Rice <gregrice@gmail.com>
View
2  rabbit/conf.d/rabbitmq.pyconf
@@ -13,7 +13,7 @@ modules {
}
param vhost {
- value = "/"
+ value = "/,vhost1,vhost2"
}
param username {
value = "guest"
View
135 rabbit/python_modules/rabbitmq.py
@@ -5,20 +5,23 @@
import urllib
import time
from string import Template
+import itertools
global url, descriptors, last_update, vhost, username, password, url_template, result, result_dict, keyToPath
INTERVAL = 20
descriptors = list()
username, password = "guest", "guest"
stats = {}
-last_update = {}
+keyToPath = {}
+last_update = None
+#last_update = {}
compiled_results = {"nodes" : None, "queues" : None, "connections" : None}
#Make initial stat test time dict
-for stat_type in ('queues', 'connections','exchanges', 'nodes'):
- last_update[stat_type] = None
-
-keyToPath = {}
+#for stat_type in ('queues', 'connections','exchanges', 'nodes'):
+# last_update[stat_type] = None
+### CONFIGURATION SECTION ###
+STATS = ['nodes', 'queues']
# QUEUE METRICS #
keyToPath['rmq_messages_ready'] = "%s.messages_ready"
@@ -71,41 +74,37 @@ def dig_it_up(obj,path):
print "Exception"
return False
-def refreshGroup(group):
-
+def refreshStats(stats = ('nodes', 'queues'), vhosts = ['/']):
global url_template
- urlstring = url_template.safe_substitute(stats = group)
-
global last_update, url, compiled_results
now = time.time()
- if not last_update[group]:
+
+ if not last_update:
diff = INTERVAL
else:
- diff = now - last_update[group]
-
- if diff >= INTERVAL or not last_update[group]:
- result_dict = {}
- print "Fetching stats after %d seconds" % INTERVAL
- result = json.load(urllib.urlopen(urlstring))
- compiled_results[group] = result
- last_update[group] = now
- #Refresh dict by names. We'll probably move this elsewhere.
- if group in ('queues', 'nodes'):
- for entry in result:
- name_attribute = entry['name']
- result_dict[name_attribute] = entry
- compiled_results[group] = result_dict
-
- return compiled_results[group]
-
-def getConnectionTotal(name):
- result = refreshGroup('connections')
- return result.length()
-
-def getConnectionStats(name):
- pass
+ diff = now - last_update
+
+ if diff >= INTERVAL or not last_update:
+ print "Fetching Results after %d seconds" % INTERVAL
+ last_update = now
+ for stat in stats:
+ for vhost in vhosts:
+ if stat in ('nodes'):
+ vhost = '/'
+ result_dict = {}
+ urlstring = url_template.safe_substitute(stats = stat, vhost = vhost)
+ print urlstring
+ result = json.load(urllib.urlopen(urlstring))
+ # Rearrange results so entry is held in a dict keyed by name - queue name, host name, etc.
+ if stat in ("queues", "nodes", "exchanges"):
+ for entry in result:
+ name = entry['name']
+ result_dict[name] = entry
+ compiled_results[(stat, vhost)] = result_dict
+
+ return compiled_results
def validatedResult(value):
if not isInstance(value, bool):
@@ -113,24 +112,28 @@ def validatedResult(value):
else:
return None
-def list_queues():
- # Make a list of queues
- results = refreshGroup('queues')
- return results.keys()
+def list_queues(vhost):
+ global compiled_results
+ queues = compiled_results[('queues', vhost)].keys()
+ return queues
def list_nodes():
- results = refreshGroup('nodes')
- return results.keys()
+ global compiled_results
+ nodes = compiled_results[('nodes', '/')].keys()
+ return nodes
def getQueueStat(name):
#Split a name like "rmq_backing_queue_ack_egress_rate.access"
#handle queue names with . in them
- split_name = name.split(".")
+ print name
+ split_name, vhost = name.split("#")
+ split_name = split_name.split(".")
stat_name = split_name[0]
queue_name = ".".join(split_name[1:])
- result = refreshGroup('queues')
+ # Run refreshStats to get the result object
+ result = compiled_results[('queues', vhost)]
value = dig_it_up(result, keyToPath[stat_name] % queue_name)
print name, value
@@ -145,9 +148,11 @@ def getQueueStat(name):
def getNodeStat(name):
#Split a name like "rmq_backing_queue_ack_egress_rate.access"
- stat_name, node_name = name.split(".")
- result = refreshGroup('nodes')
+ stat_name = name.split(".")[0]
+ node_name, vhost = name.split(".")[1].split("#")
+ result = compiled_results[('nodes', '/')]
value = dig_it_up(result, keyToPath[stat_name] % node_name)
+
print name,value
#Convert Booleans
if value is True:
@@ -156,24 +161,35 @@ def getNodeStat(name):
value = 0
return float(value)
+
+def product(*args, **kwds):
+ # replacement for itertools.product
+ # product('ABCD', 'xy') --> Ax Ay Bx By Cx Cy Dx Dy
+ pools = map(tuple, args) * kwds.get('repeat', 1)
+ result = [[]]
+ for pool in pools:
+ result = [x+[y] for x in result for y in pool]
+ for prod in result:
+ yield tuple(prod)
def metric_init(params):
''' Create the metric definition object '''
- global descriptors, stats, vhost, username, password, urlstring, url_template, compiled_results
+ global descriptors, stats, vhost, username, password, urlstring, url_template, compiled_results, STATS
print 'received the following params:'
#Set this globally so we can refresh stats
if 'host' not in params:
params['host'], params['vhost'],params['username'],params['password'] = "localhost", "/", "guest", "guest"
- vhost = params['vhost']
+
+ # Set the vhosts as a list split from params
+ vhosts = params['vhost'].split(',')
username, password = params['username'], params['password']
host = params['host']
- url = 'http://%s:%s@%s:55672/api/$stats' % (username, password, host)
+ url = 'http://%s:%s@%s:55672/api/$stats/$vhost' % (username, password, host)
url_template = Template(url)
print params
- refreshGroup("nodes")
- refreshGroup("queues")
+ refreshStats(stats = STATS, vhosts = vhosts)
def create_desc(prop):
d = {
@@ -194,9 +210,10 @@ def create_desc(prop):
def buildQueueDescriptors():
- for queue in list_queues():
- for metric in QUEUE_METRICS:
- name = "%s.%s" % (metric, queue)
+ for vhost, metric in product(vhosts, QUEUE_METRICS):
+ queues = list_queues(vhost)
+ for queue in queues:
+ name = "%s.%s#%s" % (metric, queue, vhost)
print name
d1 = create_desc({'name': name.encode('ascii','ignore'),
'call_back': getQueueStat,
@@ -210,10 +227,9 @@ def buildQueueDescriptors():
descriptors.append(d1)
def buildNodeDescriptors():
- for node in list_nodes():
- #node = node.split('@')[0]
- for stat in NODE_METRICS:
- name = '%s.%s' % (stat, node)
+ for metric in NODE_METRICS:
+ for node in list_nodes():
+ name = '%s.%s#%s' % (metric, node, '/')
print name
d2 = create_desc({'name': name.encode('ascii','ignore'),
'call_back': getNodeStat,
@@ -241,9 +257,8 @@ def metric_cleanup():
url_template = Template(url)
parameters = {"vhost":"/", "username":"guest","password":"guest", "metric_group":"rabbitmq"}
metric_init(parameters)
- result = refreshGroup('queues')
- node_result = refreshGroup('nodes')
+ result = refreshStats(stats = ('queues', 'nodes'), vhosts = ('/'))
print '***'*10
- getQueueStat('rmq_backing_queue_ack_egress_rate.gelf_client_three')
- getNodeStat('rmq_disk_free.rmqtwo@inrmq02d1')
- getNodeStat('rmq_mem_used.rmqtwo@inrmq02d1')
+ getQueueStat('rmq_backing_queue_ack_egress_rate.gelf_client_three#/')
+ getNodeStat('rmq_disk_free.rmqtwo@inrmq02d1#/')
+ getNodeStat('rmq_mem_used.rmqtwo@inrmq02d1#/')
Please sign in to comment.
Something went wrong with that request. Please try again.