Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

new module: kumofs

  • Loading branch information...
commit d16c3deeb7642e163bd29bc95c6e35de5ea36b8a 1 parent d87e1f7
@hirose31 hirose31 authored Ganglia Development Team committed
View
28 kumofs/README.mkdn
@@ -0,0 +1,28 @@
+kumofs
+===============
+
+python module for ganglia 3.1.
+
+This module sends metrics on kumofs protocol "stats".
+
+## kumofs
+
+Kumofs is a simple and fast distributed key-value store. You can use a memcached client library to set, get, CAS or delete values from/into kumofs. Backend storage is Tokyo Cabinet and it will give you great performance.
+
+* Data is partitioned and replicated over multiple servers.
+* Extreme single node performance; comparable with memcached.
+* Both read and write performance got improved as servers added.
+* Servers can be added without stopping the system.
+* Servers can be added without modifying any configuration files.
+* The system does not stop even if one or two servers crashed.
+* The system does not stop to recover crashed servers.
+* Automatic rebalancing support with a consistency control algorithm.
+* Safe CAS operation support.
+* memcached protocol support.
+
+<http://kumofs.sourceforge.net/>
+
+## AUTHOR
+
+HIROSE Masaaki <hirose31@gmail.com>
+
View
49 kumofs/conf.d/kumofs.conf
@@ -0,0 +1,49 @@
+modules {
+ module {
+ name = "kumofs"
+ language = "python"
+
+ param host {
+ value = "localhost"
+ }
+ param port {
+ value = 19800
+ }
+
+ param refresh_rate {
+ value = 20
+ }
+ # param spoof_host {
+ # value = "IPADDRESS:HOSTNAME"
+ # }
+
+ }
+}
+
+collection_group {
+ collect_every = 30
+ time_threshold = 90
+
+ metric {
+ name = "kumofs_curr_items"
+ title = "Current number of items stored"
+ value_threshold = 0
+ }
+ metric {
+ name = "kumofs_cmd_get"
+ title = "Cumulative number of retrieval reqs"
+ value_threshold = 0
+ }
+ metric {
+ name = "kumofs_cmd_set"
+ title = "Cumulative number of storage reqs"
+ value_threshold = 0
+ }
+ metric {
+ name = "kumofs_cmd_delete"
+ title = "Cumulative number of storage reqs"
+ value_threshold = 0
+ }
+
+}
+
View
174 kumofs/python_modules/kumofs.py
@@ -0,0 +1,174 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import sys
+import traceback
+import os
+import threading
+import time
+import subprocess
+import re
+
+descriptors = list()
+Desc_Skel = {}
+_Worker_Thread = None
+_Lock = threading.Lock() # synchronization lock
+Debug = False
+
+def dprint(f, *v):
+ if Debug:
+ print >>sys.stderr, "DEBUG: "+f % v
+
+class UpdateMetricThread(threading.Thread):
+
+ def __init__(self, params):
+ threading.Thread.__init__(self)
+ self.running = False
+ self.shuttingdown = False
+ self.refresh_rate = 20
+ if "refresh_rate" in params:
+ self.refresh_rate = int(params["refresh_rate"])
+ self.metric = {}
+ self.timeout = 2
+
+ self.host = "localhost"
+ self.port = 19800
+ if "host" in params:
+ self.host = params["host"]
+ if "port" in params:
+ self.port = params["port"]
+
+ def shutdown(self):
+ self.shuttingdown = True
+ if not self.running:
+ return
+ self.join()
+
+ def run(self):
+ self.running = True
+
+ while not self.shuttingdown:
+ _Lock.acquire()
+ self.update_metric()
+ _Lock.release()
+ time.sleep(self.refresh_rate)
+
+ self.running = False
+
+ def update_metric(self):
+ cmd = ["kumostat", "%s:%s" % (self.host, self.port), "stats"]
+ proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ pout, perr = proc.communicate()
+
+ for m in re.split('(?:\r\n|\n)',pout):
+ dprint("%s",m)
+ d = m.split(" ")
+ if len(d) == 3 and d[0] == "STAT":
+ self.metric["kumofs_"+d[1]] = int(d[2]) if d[2].isdigit() else d[2]
+
+ def metric_of(self, name):
+ val = 0
+ if name in self.metric:
+ _Lock.acquire()
+ val = self.metric[name]
+ _Lock.release()
+ return val
+
+def metric_init(params):
+ global descriptors, Desc_Skel, _Worker_Thread, Debug
+
+ print '[kumofs] kumofs protocol "stats"'
+ print params
+
+ # initialize skeleton of descriptors
+ Desc_Skel = {
+ 'name' : 'XXX',
+ 'call_back' : metric_of,
+ 'time_max' : 60,
+ 'value_type' : 'uint',
+ 'format' : '%d',
+ 'units' : 'XXX',
+ 'slope' : 'XXX', # zero|positive|negative|both
+ 'description' : 'XXX',
+ 'groups' : 'kumofs',
+ }
+
+ if "refresh_rate" not in params:
+ params["refresh_rate"] = 20
+ if "debug" in params:
+ Debug = params["debug"]
+ dprint("%s", "Debug mode on")
+
+ _Worker_Thread = UpdateMetricThread(params)
+ _Worker_Thread.start()
+
+ # IP:HOSTNAME
+ if "spoof_host" in params:
+ Desc_Skel["spoof_host"] = params["spoof_host"]
+
+ descriptors.append(create_desc(Desc_Skel, {
+ "name" : "kumofs_curr_items",
+ "units" : "items",
+ "slope" : "both",
+ "description": "Current number of items stored",
+ }))
+ descriptors.append(create_desc(Desc_Skel, {
+ "name" : "kumofs_cmd_get",
+ "units" : "commands",
+ "slope" : "positive",
+ "description": "Cumulative number of retrieval reqs",
+ }))
+ descriptors.append(create_desc(Desc_Skel, {
+ "name" : "kumofs_cmd_set",
+ "units" : "commands",
+ "slope" : "positive",
+ "description": "Cumulative number of storage reqs",
+ }))
+ descriptors.append(create_desc(Desc_Skel, {
+ "name" : "kumofs_cmd_delete",
+ "units" : "commands",
+ "slope" : "positive",
+ "description": "Cumulative number of storage reqs",
+ }))
+
+ return descriptors
+
+def create_desc(skel, prop):
+ d = skel.copy()
+ for k,v in prop.iteritems():
+ d[k] = v
+ return d
+
+def metric_of(name):
+ return _Worker_Thread.metric_of(name)
+
+def metric_cleanup():
+ _Worker_Thread.shutdown()
+
+if __name__ == '__main__':
+ try:
+ params = {
+ "host" : "s101",
+ "port" : 19800,
+ "debug" : True,
+ }
+ metric_init(params)
+
+ # for d in descriptors:
+ # print ''' metric {
+ # name = "%s"
+ # title = "%s"
+ # value_threshold = 0
+ # }''' % (d["name"], d["description"])
+
+ while True:
+ for d in descriptors:
+ v = d['call_back'](d['name'])
+ print ('value for %s is '+d['format']) % (d['name'], v)
+ time.sleep(5)
+ except KeyboardInterrupt:
+ time.sleep(0.2)
+ os._exit(1)
+ except:
+ traceback.print_exc()
+ os._exit(1)
Please sign in to comment.
Something went wrong with that request. Please try again.