Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
vvuksan committed Jan 11, 2013
2 parents 093a575 + ab6308d commit ee8b9d5
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 0 deletions.
27 changes: 27 additions & 0 deletions zeromq_pub/README.rst
@@ -0,0 +1,27 @@
ZeroMQ PUB Monitor for Ganglia
==============================

This is a gmond metric-gathering module which reports a cumulative count
of messages published by ZeroMQ publishers.

Endpoints are specified as configuration parameters in zpubmon.pyconf.
Each configuration key name is used as an endpoint name and its
corresponding value as the endpoint's URI. The configuration param
`groups` is special-cased: if present, its value specifies the group
name for the metrics generated by the module. Otherwise the default
group name ('ZeroMQ') is used.

To test, invoke with one or more pairs of (endpoint name, endpoint URI)
pairs specifying ZMQ publishers to poll. For example::

$ python zpubmon.py system-events tcp://localhost:8006

Message counts will be logged to the console every five seconds.

For more information about configuring Python modules for gmond, see the
`official documentation <http://sourceforge.net/apps/trac/ganglia/wiki
/ganglia_gmond_python_modules>`_.

Copyright (c) 2012 by Ori Livneh <ori@wikimedia.org>

Licensed under the GNU General Public Licence, version 2.0 or later.
37 changes: 37 additions & 0 deletions zeromq_pub/conf.d/zpubmon.pyconf
@@ -0,0 +1,37 @@
# Sample configuration for zpubmon Ganglia module

modules {
module {
name = "zpubmon"
language = "python"
param groups {
value = "ChangeMe"
}
param server-generated-raw {
value = "tcp://127.0.0.1:8421"
}
param client-generated-raw {
value = "tcp://127.0.0.1:8422"
}
param client-generated-valid {
value = "tcp://127.0.0.1:8484"
}
}
}

collection_group {
collect_every = 10
time_threshold = 60
metric {
name = "server-generated-raw"
title = "Raw server-generated events"
}
metric {
name = "client-generated-raw"
title = "Raw client-generated events"
}
metric {
name = "client-generated-valid"
title = "Valid client-generated events"
}
}
140 changes: 140 additions & 0 deletions zeromq_pub/python_modules/zpubmon.py
@@ -0,0 +1,140 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
ZeroMQ PUB Monitor for Ganglia
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
This is a gmond metric-gathering module which reports a cumulative
count of messages published by ZeroMQ publishers.
To test, invoke with one or more pairs of (endpoint name, endpoint
URI) pairs specifying ZMQ publishers to poll. For example:
$ python zpubmon.py system-events tcp://localhost:8006
See README for more details.
:copyright: (c) 2012 by Ori Livneh <ori@wikimedia.org>
:license: GNU General Public Licence 2.0 or later
"""
import errno
import logging
import sys
import threading
import time

import zmq


logging.basicConfig(format='[ZMQ] %(asctime)s %(message)s', level=logging.INFO)


def zmq_pub_mon(endpoints, counter):
"""
Measure throughput of ZeroMQ publishers.
*endpoints* is a dict that maps human-readable endpoint names to
endpoint URIs. The names are used as metric names in Ganglia and
as the ZMQ_IDENTITY of the underlying socket.
"""
ctx = zmq.Context.instance()
poller = zmq.Poller()

for name, uri in endpoints.iteritems():
logging.info('Registering %s (%s).', name, uri)
sock = ctx.socket(zmq.SUB)
sock.setsockopt(zmq.IDENTITY, name)
sock.connect(uri)
sock.setsockopt(zmq.SUBSCRIBE, '')
poller.register(sock, zmq.POLLIN)

while 1:
try:
for socket, _ in poller.poll():
socket.recv(zmq.NOBLOCK)
name = socket.getsockopt(zmq.IDENTITY)
counter[name] += 1
except zmq.ZMQError as e:
# Calls interrupted by EINTR should be re-tried.
if e.errno == errno.EINTR:
continue
raise


def metric_init(params):
"""
Initialize metrics.
Gmond invokes this method with a dict of arguments specified in
zpubmon.py. If *params* contains a `groups` key, its value is used
as the group name in Ganglia (in lieu of the default 'ZeroMQ').
Other items are interpreted as (name: URI) pairs of ZeroMQ endpoints
to monitor.
`metric_init` spawns a worker thread to monitor these endpoints and
returns a list of metric descriptors.
"""
groups = params.pop('groups', 'ZeroMQ')
counter = {name: 0 for name in params}

thread = threading.Thread(target=zmq_pub_mon, args=(params, counter))
thread.daemon = True
thread.start()

return [{
'name': name,
'value_type': 'uint',
'format': '%d',
'units': 'events',
'slope': 'positive',
'time_max': 20,
'description': 'messages published',
'groups': groups,
'call_back': counter.get,
} for name in params]


def metric_cleanup():
"""
Clean-up handler
Terminates any lingering threads. Gmond calls this function when
it is shutting down.
"""
logging.debug('Shutting down.')
for thread in threading.enumerate():
if thread.isAlive():
thread._Thread__stop() # pylint: disable=W0212


def self_test():
"""
Perform self-test.
Parses *argv* as a collection of (name, URI) pairs specifying ZeroMQ
publishers to be monitored. Message counts are polled and outputted
every five seconds.
"""
params = dict(zip(sys.argv[1::2], sys.argv[2::2]))
if not params:
print 'Usage: %s NAME URI [NAME URI, ...]' % sys.argv[0]
print 'Example: %s my-zmq-stream tcp://localhost:8006' % sys.argv[0]
sys.exit(1)

descriptors = metric_init(params)

while 1:
for descriptor in descriptors:
name = descriptor['name']
call_back = descriptor['call_back']
logging.info('%s: %s', name, call_back(name))
time.sleep(5)


if __name__ == '__main__':
self_test()

0 comments on commit ee8b9d5

Please sign in to comment.