Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
branch: master
Fetching contributors…

Octocat-spinner-32-eaf2f5

Cannot retrieve contributors at this time

executable file 154 lines (128 sloc) 5.132 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
#!/usr/bin/env python2.6

from amqplib import client_0_8 as amqp
from xdrlib import Unpacker
import ConfigParser
import SocketServer
import socket
import sys
import threading
import time

# All these types expect a metric_id struct & fmt string first
GANGLIA_DECODE = {
# 129: lambda(unpacker): unpacker.unpack_ushort(),
# 130: lambda(unpacker): unpacker.unpack_short(),
    131: lambda(unpacker): unpacker.unpack_int(),
    132: lambda(unpacker): unpacker.unpack_uint(),
    134: lambda(unpacker): unpacker.unpack_float(),
    135: lambda(unpacker): unpacker.unpack_double(),
}

GANGLIA_LISTEN_PORT = 8651

class GangliaCollector(SocketServer.BaseRequestHandler):
    def handle(self):
        data = self.request[0]

        unpacker = Unpacker(data)
        type = unpacker.unpack_int()
        if type not in GANGLIA_DECODE: return

        host = unpacker.unpack_string()
        name = unpacker.unpack_string()
        unpacker.unpack_int() # spoof boolean
        unpacker.unpack_string() # format string
        value = GANGLIA_DECODE[type](unpacker)
        unpacker.done()

        graphite.record_stat(name, value)

class GraphiteAggregator(object):
    def __init__(self, host, cluster, mapping, rates, amqp_conn, amqp_exchange):
        self.host = host
        self.cluster = cluster
        self.mapping = mapping
        self.use_rates = rates
        self.amqp_conn = amqp_conn
        self.amqp_exchange = amqp_exchange
        self.amqp_chan = self.amqp_conn.channel()

        self.last_value = {}
        self.last_time = {}
        self.stats_lock = threading.Lock()
        self.stats = []

        update_thread = threading.Thread(target=self.send_updates_thread)
        update_thread.setDaemon(True)
        update_thread.start()

    def record_stat(self, orig_name, value):
        if orig_name not in self.mapping: return
        name = self.mapping[orig_name]
        now = time.time()
        if name in self.use_rates:
            if name not in self.last_value:
                self.last_value[name] = value
                self.last_time[name] = now
                return
            if value < self.last_value[name]:
                # counter rollover?
                self.last_value[name] = value
                self.last_time[name] = now
                return
            rate = (value - self.last_value[name]) / \
                   (now - self.last_time[name])
            self.last_value[name] = value
            self.last_time[name] = now
            value = rate

        # lock, append to updatequeue name/value, unlock
        self.stats_lock.acquire()
        self.stats.append((name, value, now))
        self.stats_lock.release()

    def send_updates_thread(self):
        while True:
            time.sleep(10)
            try:
                self.send_updates()
            except Exception as e:
                print >>sys.stderr, "Error sending updates: %s" % (e,)

    def send_updates(self):
        if len(self.stats) == 0: return

        self.stats_lock.acquire()
        stats = self.stats[:]
        self.stats = []
        self.stats_lock.release()

        output = []
        for name, value, now in stats:
            output.append(self._stat(name, value, now))

        print "\n".join(output)
        msg = amqp.Message("\n".join(output))
        try:
            self.amqp_chan.basic_publish(msg, exchange=self.amqp_exchange)
        except socket.error as (errno, errstr):
            print >> sys.stderr, "amqp publish problem: %s" % (errstr,)
            sys.exit(2)

    def _stat(self, name, value, now):
        return "%s.%s.%s %f %d" % \
               (name, self.cluster, self.host, value, int(now))

if __name__ == "__main__":
    config = ConfigParser.RawConfigParser()
    if len(sys.argv) > 1:
        config.read(sys.argv[1])
    else:
        config.read("/etc/graphlia.ini")

    mapping = {}
    use_rates = []
    for line in config.get("gmond", "mapping").split("\n"):
        if line == "": continue
        parts = line.split(":")
        if len(parts) != 2 and len(parts) != 3:
            raise ValueError("mapping #{line} must be of form gmond:ganglia[:rate]")
        mapping[parts[0]] = parts[1]
        if len(parts) == 3 and parts[2] == "rate":
            use_rates.append(parts[1])

    if len(mapping) == 0:
        raise ValueError("gmond.mapping is missing in the config file")

    host = config.get("collector", "host")
    colo = config.get("collector", "colo")

    amqp_conn = amqp.Connection(host=config.get("amqp", "host"),
                                userid=config.get("amqp", "user"),
                                password=config.get("amqp", "pass"),
                                virtual_host=config.get("amqp", "vhost"),
                                insist=False)
    amqp_exchange = config.get("amqp", "exchange")

    graphite = GraphiteAggregator(host, colo, mapping, use_rates,
                                  amqp_conn, amqp_exchange)

    server = SocketServer.UDPServer(("127.0.0.1", GANGLIA_LISTEN_PORT),
                                    GangliaCollector)
    server.serve_forever()
Something went wrong with that request. Please try again.