Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

MB-5002: add cbclusterstats tool

cbclusterstats will collect stats against all node in a cluster

Change-Id: I004b7768ea736a5c4d85c413fd38d0039d81cc78
Reviewed-on: http://review.couchbase.org/16968
Reviewed-by: Steve Yen <steve.yen@gmail.com>
Tested-by: Steve Yen <steve.yen@gmail.com>
  • Loading branch information...
commit 40bd0f4de4443ed8c5c30e7a6393be13465d9bcd 1 parent e56a5ce
@bcui6611 bcui6611 authored
View
8 Makefile.am
@@ -7,19 +7,25 @@ default:
pythonlibdir=$(libdir)/python
nobase_pythonlib_DATA= \
buckets.py \
+ cluster_stats.py \
+ collector.py \
+ diskqueue_stats.py \
info.py \
listservers.py \
node.py \
+ node_stats.py \
+ processor.py \
restclient.py \
simplejson/LICENSE.txt \
simplejson/__init__.py \
simplejson/decoder.py \
simplejson/encoder.py \
simplejson/scanner.py \
+ stats_buffer.py \
usage.py \
util_cli.py
-pythonlib_SCRIPTS= couchbase-cli
+pythonlib_SCRIPTS= couchbase-cli cbclusterstats
PYTHON_TOOLS= wrapper/couchbase-cli
View
46 buckets.py
@@ -11,6 +11,9 @@
'bucket-delete': '/pools/default/buckets/',
'bucket-create': '/pools/default/buckets/',
'bucket-edit': '/pools/default/buckets/',
+ 'bucket-get': '/pools/default/buckets',
+ 'bucket-stats': '/pools/default/buckets/{0}/stats?zoom=hour',
+ 'bucket-node-stats': '/pools/default/buckets/{0}/stats/{1}?zoom={2}'
}
methods = {
'bucket-list': 'GET',
@@ -18,7 +21,9 @@
'bucket-create': 'POST',
'bucket-edit': 'POST',
'bucket-flush': 'POST',
+ 'bucket-get': 'GET',
'bucket-stats': 'GET',
+ 'bucket-node-stats': 'GET',
}
class Buckets:
@@ -102,7 +107,9 @@ def runCmd(self, cmd, server, port,
data = rest.restCmd(methods[cmd], self.rest_cmd,
self.user, self.password, opts)
- if cmd == "bucket-list":
+ if cmd in ("bucket-get", "bucket-stats", "bucket-node-stats"):
+ return rest.getJson(data)
+ elif cmd == "bucket-list":
if output == 'json':
print data
else:
@@ -123,3 +130,40 @@ def runCmd(self, cmd, server, port,
print rest.jsonMessage(data)
else:
print data
+
+class BucketStats:
+ def __init__(self, bucket_name):
+ self.debug = False
+ self.rest_cmd = rest_cmds['bucket-stats'].format(bucket_name)
+ self.method = 'GET'
+
+ def runCmd(self, cmd, server, port,
+ user, password, opts):
+ opts = {}
+ opts['error_msg'] = "unable to %s" % cmd
+ opts['success_msg'] = "%s" % cmd
+
+ #print server, port, cmd, self.rest_cmd
+ rest = restclient.RestClient(server, port, {'debug':self.debug})
+ data = rest.restCmd(methods[cmd], self.rest_cmd,
+ user, password, opts)
+ return rest.getJson(data)
+
+class BucketNodeStats:
+ def __init__(self, bucket_name, stat_name, scale):
+ self.debug = False
+ self.rest_cmd = rest_cmds['bucket-node-stats'].format(bucket_name, stat_name, scale)
+ self.method = 'GET'
+ #print self.rest_cmd
+
+ def runCmd(self, cmd, server, port,
+ user, password, opts):
+ opts = {}
+ opts['error_msg'] = "unable to %s" % cmd
+ opts['success_msg'] = "%s" % cmd
+
+ #print server, port, cmd, self.rest_cmd
+ rest = restclient.RestClient(server, port, {'debug':self.debug})
+ data = rest.restCmd(methods[cmd], self.rest_cmd,
+ user, password, opts)
+ return rest.getJson(data)
View
86 cbclusterstats
@@ -0,0 +1,86 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+
+import getopt
+import sys
+import os
+import traceback
+import copy
+import logging
+
+import collector
+import stats_buffer
+import util
+
+import processor
+
+log = logging.getLogger('cbclusterstats')
+log.setLevel(logging.INFO)
+log.addHandler(logging.StreamHandler())
+
+def parse_opt():
+ (cluster, user, password, txtfile) = ('', '', '', 'clusterstats.txt')
+
+ try:
+ (opts, _args) = getopt.getopt(sys.argv[1:],
+ 'c:dp:u:o:', [
+ 'cluster=',
+ 'debug',
+ 'password=',
+ 'user=',
+ 'txt=',
+ ])
+ except getopt.GetoptError, err:
+ usage(err)
+
+ for (opt, arg) in opts:
+ if opt in ('-c', '--cluster'):
+ cluster = arg
+ if opt in ('-u', '--user'):
+ user = arg
+ if opt in ('-p', '--password'):
+ password = arg
+ if opt in ('-d', '--debug'):
+ log.setLevel(logging.DEBUG)
+ if opt in ('-o', '--output'):
+ txtfile = arg
+
+ if not cluster or not user or not password:
+ usage()
+ return (cluster, user, password, txtfile, opts)
+
+def usage(error_msg=''):
+ if error_msg:
+ print "ERROR: %s" % error_msg
+ sys.exit(2)
+
+ print """cbclusterstats - cluster key performance indicator stats
+
+usage: cbclusterstats CLUSTER USERNAME PASSWORD OPTIONS
+
+CLUSTER:
+ --cluster=HOST[:PORT] or -c HOST[:PORT] Default port is 8091
+USERNAME:
+ -u USERNAME, --user=USERNAME admin username of the cluster
+PASSWORD:
+ -p PASSWORD, --password=PASSWORD admin password of the cluster
+OPTIONS:
+ -o FILENAME, --output=FILENAME Default output filename is 'clusterstats.txt'
+ -d, --debug
+
+"""
+ sys.exit(2)
+
+def main():
+ (cluster, user, password, txtfile, opts) = parse_opt()
+ #make snapshot for the current cluster status
+ retriever = collector.StatsCollector(log)
+ retriever.collect_data(cluster, user, password, opts)
+
+ #analyze the snapshot and historic data
+ performer = processor.StatsAnalyzer(log)
+ performer.run_analysis()
+ performer.run_report(txtfile)
+
+if __name__ == '__main__':
+ main()
View
510 cluster_stats.py
@@ -0,0 +1,510 @@
+import stats_buffer
+import util_cli as util
+
+class BucketSummary:
+ def run(self, accessor):
+ return stats_buffer.bucket_info
+
+class DGMRatio:
+ def run(self, accessor):
+ result = []
+ hdd_total = 0
+ ram_total = 0
+ for node, nodeinfo in stats_buffer.nodes.iteritems():
+ if nodeinfo["StorageInfo"].has_key("hdd"):
+ hdd_total += nodeinfo['StorageInfo']['hdd']['usedByData']
+ if nodeinfo["StorageInfo"].has_key("ram"):
+ ram_total += nodeinfo['StorageInfo']['ram']['usedByData']
+ if ram_total > 0:
+ ratio = hdd_total / ram_total
+ else:
+ ratio = 0
+ return ratio
+
+class ARRatio:
+ def run(self, accessor):
+ result = {}
+ cluster = 0
+ for bucket, stats_info in stats_buffer.buckets.iteritems():
+ item_avg = {
+ "curr_items": [],
+ "vb_replica_curr_items": [],
+ }
+ num_error = []
+ for counter in accessor["counter"]:
+ values = stats_info[accessor["scale"]][counter]
+ nodeStats = values["nodeStats"]
+ samplesCount = values["samplesCount"]
+ for node, vals in nodeStats.iteritems():
+ avg = sum(vals) / samplesCount
+ item_avg[counter].append((node, avg))
+ res = []
+ active_total = replica_total = 0
+ for active, replica in zip(item_avg['curr_items'], item_avg['vb_replica_curr_items']):
+ if replica[1] == 0:
+ res.append((active[0], "No replica"))
+ else:
+ ratio = 1.0 * active[1] / replica[1]
+ res.append((active[0], util.pretty_float(ratio)))
+ if ratio < accessor["threshold"]:
+ num_error.append({"node":active[0], "value": ratio})
+ active_total += active[1]
+ replica_total += replica[1]
+ if replica_total == 0:
+ res.append(("total", "no replica"))
+ else:
+ ratio = active_total * 1.0 / replica_total
+ cluster += ratio
+ res.append(("total", util.pretty_float(ratio)))
+ if ratio != accessor["threshold"]:
+ num_error.append({"node":"total", "value": ratio})
+ #if len(num_error) > 0:
+ # result[bucket] = {"error" : num_error}
+ #else:
+ result[bucket] = res
+ if len(stats_buffer.buckets) > 0:
+ result["cluster"] = util.pretty_float(cluster / len(stats_buffer.buckets))
+
+ return result
+
+class OpsRatio:
+ def run(self, accessor):
+ result = {}
+ for bucket, stats_info in stats_buffer.buckets.iteritems():
+ ops_avg = {
+ "cmd_get": [],
+ "cmd_set": [],
+ "delete_hits" : [],
+ }
+ for counter in accessor["counter"]:
+ values = stats_info[accessor["scale"]][counter]
+ nodeStats = values["nodeStats"]
+ samplesCount = values["samplesCount"]
+ for node, vals in nodeStats.iteritems():
+ avg = sum(vals) / samplesCount
+ ops_avg[counter].append((node, avg))
+ res = []
+ read_total = write_total = del_total = 0
+ for read, write, delete in zip(ops_avg['cmd_get'], ops_avg['cmd_set'], ops_avg['delete_hits']):
+ count = read[1] + write[1] + delete[1]
+ if count == 0:
+ res.append((read[0], "0:0:0"))
+ else:
+ read_ratio = read[1] *100 / count
+ read_total += read_ratio
+ write_ratio = write[1] * 100 / count
+ write_total += write_ratio
+ del_ratio = delete[1] * 100 / count
+ del_total += del_ratio
+ res.append((read[0], "{0}:{1}:{2}".format(int(read_ratio+.5), int(write_ratio+.5), int(del_ratio+.5))))
+ read_total /= len(ops_avg['cmd_get'])
+ write_total /= len(ops_avg['cmd_set'])
+ del_total /= len(ops_avg['delete_hits'])
+ res.append(("total", "{0}:{1}:{2}".format(int(read_total+.5), int(write_total+.5), int(del_total+.5))))
+ result[bucket] = res
+
+ return result
+
+class CacheMissRatio:
+ def run(self, accessor):
+ result = {}
+ cluster = 0
+ for bucket, stats_info in stats_buffer.buckets.iteritems():
+ values = stats_info[accessor["scale"]][accessor["counter"]]
+ timestamps = values["timestamp"]
+ timestamps = [x - timestamps[0] for x in timestamps]
+ nodeStats = values["nodeStats"]
+ samplesCount = values["samplesCount"]
+ trend = []
+ total = 0
+ data = []
+ for node, vals in nodeStats.iteritems():
+ #a, b = util.linreg(timestamps, vals)
+ value = sum(vals) / samplesCount
+ #value = a * timestamps[-1] + b
+ total += value
+ trend.append((node, util.pretty_float(value)))
+ data.append(value)
+ total /= len(nodeStats)
+ trend.append(("total", util.pretty_float(total)))
+ trend.append(("variance", util.two_pass_variance(data)))
+ cluster += total
+ result[bucket] = trend
+ if len(stats_buffer.buckets) > 0:
+ result["cluster"] = util.pretty_float(cluster / len(stats_buffer.buckets))
+ return result
+
+class MemUsed:
+ def run(self, accessor):
+ result = {}
+ cluster = 0
+ for bucket, stats_info in stats_buffer.buckets.iteritems():
+ values = stats_info[accessor["scale"]][accessor["counter"]]
+ timestamps = values["timestamp"]
+ timestamps = [x - timestamps[0] for x in timestamps]
+ nodeStats = values["nodeStats"]
+ samplesCount = values["samplesCount"]
+ trend = []
+ total = 0
+ data = []
+ for node, vals in nodeStats.iteritems():
+ avg = sum(vals) / samplesCount
+ trend.append((node, util.size_label(avg)))
+ data.append(avg)
+ #print data
+ trend.append(("variance", util.two_pass_variance(data)))
+ result[bucket] = trend
+ return result
+
+class ItemGrowth:
+ def run(self, accessor):
+ result = {}
+ for bucket, stats_info in stats_buffer.buckets.iteritems():
+ trend = []
+ values = stats_info[accessor["scale"]][accessor["counter"]]
+ timestamps = values["timestamp"]
+ timestamps = [x - timestamps[0] for x in timestamps]
+ nodeStats = values["nodeStats"]
+ samplesCount = values["samplesCount"]
+ for node, vals in nodeStats.iteritems():
+ a, b = util.linreg(timestamps, vals)
+ if b < 1:
+ trend.append((node, 0))
+ else:
+ start_val = b
+ end_val = a * timestamps[-1] + b
+ rate = (end_val * 1.0 / b - 1.0) * 100
+ trend.append((node, util.pretty_float(rate)))
+ result[bucket] = trend
+ return result
+
+class NumVbuckt:
+ def run(self, accessor):
+ result = {}
+ for bucket, stats_info in stats_buffer.buckets.iteritems():
+ num_error = []
+ values = stats_info[accessor["scale"]][accessor["counter"]]
+ nodeStats = values["nodeStats"]
+ for node, vals in nodeStats.iteritems():
+ if vals[-1] < accessor["threshold"]:
+ num_error.append({"node":node, "value":vals[-1]})
+ if len(num_error) > 0:
+ result[bucket] = {"error" : num_error}
+ return result
+
+class RebalanceStuck:
+ def run(self, accessor):
+ result = {}
+ for bucket, bucket_stats in stats_buffer.node_stats.iteritems():
+ num_error = []
+ for node, stats_info in bucket_stats.iteritems():
+ for key, value in stats_info.iteritems():
+ if key.find(accessor["counter"]) >= 0:
+ if accessor.has_key("threshold"):
+ if int(value) > accessor["threshold"]:
+ num_error.append({"node":node, "value": (key, value)})
+ else:
+ num_error.append({"node":node, "value": (key, value)})
+ if len(num_error) > 0:
+ result[bucket] = {"error" : num_error}
+ return result
+
+class MemoryFramentation:
+ def run(self, accessor):
+ result = {}
+ for bucket, bucket_stats in stats_buffer.node_stats.iteritems():
+ num_error = []
+ for node, stats_info in bucket_stats.iteritems():
+ for key, value in stats_info.iteritems():
+ if key.find(accessor["counter"]) >= 0:
+ if accessor.has_key("threshold"):
+ if int(value) > accessor["threshold"]:
+ if accessor.has_key("unit"):
+ if accessor["unit"] == "time":
+ num_error.append({"node":node, "value": (key, util.time_label(value))})
+ elif accessor["unit"] == "size":
+ num_error.append({"node":node, "value": (key, util.size_label(value))})
+ else:
+ num_error.append({"node":node, "value": (key, value)})
+ else:
+ num_error.append({"node":node, "value": (key, value)})
+ if len(num_error) > 0:
+ result[bucket] = {"error" : num_error}
+ return result
+
+class EPEnginePerformance:
+ def run(self, accessor):
+ result = {}
+ for bucket, bucket_stats in stats_buffer.node_stats.iteritems():
+ num_error = []
+ for node, stats_info in bucket_stats.iteritems():
+ for key, value in stats_info.iteritems():
+ if key.find(accessor["counter"]) >= 0:
+ if accessor.has_key("threshold"):
+ if accessor["counter"] == "flusherState" and value != accessor["threshold"]:
+ num_error.append({"node":node, "value": (key, value)})
+ elif accessor["counter"] == "flusherCompleted" and value == accessor["threshold"]:
+ num_error.append({"node":node, "value": (key, value)})
+ else:
+ if value > accessor["threshold"]:
+ num_error.append({"node":node, "value": (key, value)})
+ if len(num_error) > 0:
+ result[bucket] = {"error" : num_error}
+ return result
+
+class TotalDataSize:
+ def run(self, accessor):
+ result = []
+ total = 0
+ for node, nodeinfo in stats_buffer.nodes.iteritems():
+ if nodeinfo["StorageInfo"].has_key("hdd"):
+ total += nodeinfo['StorageInfo']['hdd']['usedByData']
+ result.append(util.size_label(total))
+ return result
+
+class AvailableDiskSpace:
+ def run(self, accessor):
+ result = []
+ total = 0
+ for node, nodeinfo in stats_buffer.nodes.iteritems():
+ if nodeinfo["StorageInfo"].has_key("hdd"):
+ total += nodeinfo['StorageInfo']['hdd']['free']
+ result.append(util.size_label(total))
+ return result
+
+ClusterCapsule = [
+ {"name" : "TotalDataSize",
+ "ingredients" : [
+ {
+ "name" : "totalDataSize",
+ "description" : "Total Data Size across cluster",
+ "code" : "TotalDataSize",
+ }
+ ],
+ "clusterwise" : True,
+ "perNode" : False,
+ "perBucket" : False,
+ },
+ {"name" : "AvailableDiskSpace",
+ "ingredients" : [
+ {
+ "name" : "availableDiskSpace",
+ "description" : "Available disk space",
+ "code" : "AvailableDiskSpace",
+ }
+ ],
+ "clusterwise" : True,
+ "perNode" : False,
+ "perBucket" : False,
+ },
+ {"name" : "CacheMissRatio",
+ "ingredients" : [
+ {
+ "name" : "cacheMissRatio",
+ "description" : "Cache miss ratio",
+ "counter" : "ep_cache_miss_rate",
+ "scale" : "hour",
+ "code" : "CacheMissRatio",
+ "unit" : "percentage",
+ "threshold" : 2,
+ },
+ ],
+ "clusterwise" : True,
+ "perNode" : True,
+ "perBucket" : True,
+ "indicator" : False,
+ "nodeDisparate" : True,
+ },
+ {"name" : "DGM",
+ "ingredients" : [
+ {
+ "name" : "dgm",
+ "description" : "Disk to Memory Ratio",
+ "code" : "DGMRatio"
+ },
+ ],
+ "clusterwise" : True,
+ "perNode" : False,
+ "perBucket" : False,
+ },
+ {"name" : "BucketSummary",
+ "ingredients" : [
+ {
+ "name" : "bucketSummary",
+ "description" : "Bucket performance summary",
+ "code" : "BucketSummary",
+ },
+ ],
+ "clusterwise" : True,
+ },
+ {"name" : "ActiveReplicaResidentRatio",
+ "ingredients" : [
+ {
+ "name" : "activeReplicaResidencyRatio",
+ "description" : "Active and Replica Resident Ratio",
+ "counter" : ["curr_items", "vb_replica_curr_items"],
+ "scale" : "minute",
+ "code" : "ARRatio",
+ "threshold" : 1,
+ },
+ ],
+ "clusterwise" : True,
+ "perNode" : True,
+ "perBucket" : True,
+ "indicator" : True,
+ },
+ {"name" : "OPSPerformance",
+ "ingredients" : [
+ {
+ "name" : "opsPerformance",
+ "description" : "Read/Write/Delete ops ratio",
+ "scale" : "minute",
+ "counter" : ["cmd_get", "cmd_set", "delete_hits"],
+ "code" : "OpsRatio",
+ },
+ ],
+ "perBucket" : True,
+ },
+ {"name" : "GrowthRate",
+ "ingredients" : [
+ {
+ "name" : "dataGrowthRateForItems",
+ "description" : "Data Growth rate for items",
+ "counter" : "curr_items",
+ "scale" : "day",
+ "code" : "ItemGrowth",
+ "unit" : "percentage",
+ },
+ ]
+ },
+ {"name" : "VBucketNumber",
+ "ingredients" : [
+ {
+ "name" : "activeVbucketNumber",
+ "description" : "Active VBucket number is less than expected",
+ "counter" : "vb_active_num",
+ "scale" : "hour",
+ "code" : "NumVbuckt",
+ "threshold" : 1024,
+ },
+ {
+ "name" : "replicaVBucketNumber",
+ "description" : "Replica VBucket number is less than expected",
+ "counter" : "vb_replica_num",
+ "scale" : "hour",
+ "code" : "NumVbuckt",
+ "threshold" : 1024,
+ },
+ ],
+ "indicator" : True,
+ },
+ {"name" : "MemoryUsage",
+ "ingredients" : [
+ {
+ "name" : "memoryUsage",
+ "description" : "Check memory usage",
+ "counter" : "mem_used",
+ "scale" : "hour",
+ "code" : "MemUsed",
+ },
+ ],
+ "nodeDisparate" : True,
+ },
+ {"name" : "RebalancePerformance",
+ "ingredients" : [
+ {
+ "name" : "rebalanceStuck",
+ "description" : "Check if rebalance is stuck",
+ "counter" : "idle",
+ "code" : "RebalanceStuck",
+ },
+ {
+ "name" : "highBackfillRemaing",
+ "description" : "Tap queue backfilll remaining is too high",
+ "counter" : "ep_tap_queue_backfillremaining",
+ "code" : "RebalanceStuck",
+ "threshold" : 1000,
+ },
+ ],
+ "indicator" : True,
+ },
+ {"name" : "MemoryFragmentation",
+ "ingredients" : [
+ {
+ "name" : "totalFragmentation",
+ "description" : "Total memory fragmentation",
+ "counter" : "total_fragmentation_bytes",
+ "code" : "MemoryFramentation",
+ "unit" : "size",
+ "threshold" : 1073741824, # 1GB
+ },
+ {
+ "name" : "diskDelete",
+ "description" : "Averge disk delete time",
+ "counter" : "disk_del",
+ "code" : "MemoryFramentation",
+ "unit" : "time",
+ "threshold" : 1000 #1ms
+ },
+ {
+ "name" : "diskUpdate",
+ "description" : "Averge disk update time",
+ "counter" : "disk_update",
+ "code" : "MemoryFramentation",
+ "unit" : "time",
+ "threshold" : 1000 #1ms
+ },
+ {
+ "name" : "diskInsert",
+ "description" : "Averge disk insert time",
+ "type" : "python",
+ "counter" : "disk_insert",
+ "code" : "MemoryFramentation",
+ "unit" : "time",
+ "threshold" : 1000 #1ms
+ },
+ {
+ "name" : "diskInsert",
+ "description" : "Averge disk insert time",
+ "counter" : "disk_commit",
+ "code" : "MemoryFramentation",
+ "unit" : "time",
+ "threshold" : 5000000 #10s
+ },
+ ],
+ "indicator" : True,
+ },
+ {"name" : "EPEnginePerformance",
+ "ingredients" : [
+ {
+ "name" : "flusherState",
+ "description" : "Engine flusher state",
+ "counter" : "ep_flusher_state",
+ "code" : "EPEnginePerformance",
+ "threshold" : "running",
+ },
+ {
+ "name" : "flusherCompleted",
+ "description" : "Flusher completed",
+ "counter" : "ep_flusher_num_completed",
+ "code" : "EPEnginePerformance",
+ "threshold" : 0
+ },
+ {
+ "name" : "avgItemLoadTime",
+ "description" : "Average item loaded time",
+ "counter" : "ep_bg_load_avg",
+ "code" : "EPEnginePerformance",
+ "threshold" : 100,
+ },
+ {
+ "name" : "avgItemWaitTime",
+ "description" : "Averge item waited time",
+ "counter" : "ep_bg_wait_avg",
+ "code" : "EPEnginePerformance",
+ "threshold" : 100
+ },
+ ],
+ "indicator" : True,
+ },
+]
View
233 collector.py
@@ -0,0 +1,233 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+
+import sys
+import traceback
+import copy
+import logging
+
+import listservers
+import buckets
+import node
+import info
+import util_cli as util
+import mc_bin_client
+
+import stats_buffer
+
+class StatsCollector:
+ def __init__(self, log):
+ self.log = log
+
+ def seg(self, k, v):
+ # Parse ('some_stat_x_y', 'v') into (('some_stat', x, y), v)
+ ka = k.split('_')
+ k = '_'.join(ka[0:-1])
+ kstart, kend = [int(x) for x in ka[-1].split(',')]
+ return ((k, kstart, kend), int(v))
+
+ def retrieve_node_stats(self, nodeInfo, nodeStats):
+ nodeStats['portDirect'] = nodeInfo['ports']['direct']
+ nodeStats['portProxy'] = nodeInfo['ports']['proxy']
+ nodeStats['clusterMembership'] = nodeInfo['clusterMembership']
+ nodeStats['os'] = nodeInfo['os']
+ nodeStats['uptime'] = nodeInfo['uptime']
+ nodeStats['version'] = nodeInfo['version']
+
+ #memory
+ nodeStats['memory'] = {}
+ nodeStats['memory']['allocated'] = nodeInfo['mcdMemoryAllocated']
+ nodeStats['memory']['reserved'] = nodeInfo['mcdMemoryReserved']
+ nodeStats['memory']['free'] = nodeInfo['memoryFree']
+ nodeStats['memory']['quota'] = nodeInfo['memoryQuota']
+ nodeStats['memory']['total'] = nodeInfo['memoryTotal']
+
+ #storageInfo
+ nodeStats['StorageInfo'] = {}
+ if nodeInfo['storageTotals'] is not None:
+
+ #print nodeInfo
+ hdd = nodeInfo['storageTotals']['hdd']
+ if hdd is not None:
+ nodeStats['StorageInfo']['hdd'] = {}
+ nodeStats['StorageInfo']['hdd']['free'] = hdd['free']
+ nodeStats['StorageInfo']['hdd']['quotaTotal'] = hdd['quotaTotal']
+ nodeStats['StorageInfo']['hdd']['total'] = hdd['total']
+ nodeStats['StorageInfo']['hdd']['used'] = hdd['used']
+ nodeStats['StorageInfo']['hdd']['usedByData'] = hdd['usedByData']
+ ram = nodeInfo['storageTotals']['ram']
+ if ram is not None:
+ nodeStats['StorageInfo']['ram'] = {}
+ nodeStats['StorageInfo']['ram']['quotaTotal'] = ram['quotaTotal']
+ nodeStats['StorageInfo']['ram']['total'] = ram['total']
+ nodeStats['StorageInfo']['ram']['used'] = ram['used']
+ nodeStats['StorageInfo']['ram']['usedByData'] = ram['usedByData']
+ if ram.has_key('quotaUsed'):
+ nodeStats['StorageInfo']['ram']['quotaUsed'] = ram['quotaUsed']
+ else:
+ nodeStats['StorageInfo']['ram']['quotaUsed'] = 0
+
+ #system stats
+ nodeStats['systemStats'] = {}
+ nodeStats['systemStats']['cpu_utilization_rate'] = nodeInfo['systemStats']['cpu_utilization_rate']
+ nodeStats['systemStats']['swap_total'] = nodeInfo['systemStats']['swap_total']
+ nodeStats['systemStats']['swap_used'] = nodeInfo['systemStats']['swap_used']
+
+ curr_items = 0
+ curr_items_tot = 0
+ vb_rep_curr_items = 0
+ if nodeInfo['interestingStats'] is not None:
+ if nodeInfo['interestingStats'].has_key('curr_items'):
+ curr_items = nodeInfo['interestingStats']['curr_items']
+ else:
+ curr_items = 0
+ if nodeInfo['interestingStats'].has_key('curr_items_tot'):
+ curr_items_tot = nodeInfo['interestingStats']['curr_items_tot']
+ else:
+ curr_items_tot = 0
+ if nodeInfo['interestingStats'].has_key('vb_replica_curr_items'):
+ vb_rep_curr_items = nodeInfo['interestingStats']['vb_replica_curr_items']
+ else:
+ vb_rep_curr_items = 0
+
+ nodeStats['systemStats']['currentItems'] = curr_items
+ nodeStats['systemStats']['currentItemsTotal'] = curr_items_tot
+ nodeStats['systemStats']['replicaCurrentItems'] = vb_rep_curr_items
+
+ def get_hostlist(self, server, port, user, password, opts):
+ try:
+ opts.append(("-o", "return"))
+ nodes = listservers.ListServers().runCmd('host-list', server, port, user, password, opts)
+
+ for node in nodes:
+ (node_server, node_port) = util.hostport(node['hostname'])
+ node_stats = {"host" : node_server,
+ "port" : node_port,
+ "status" : node['status'],
+ "master" : server}
+ stats_buffer.nodes[node['hostname']] = node_stats
+ if node['status'] == 'healthy':
+ node_info = info.Info().runCmd('get-server-info', node_server, node_port, user, password, opts)
+ self.retrieve_node_stats(node_info, node_stats)
+ else:
+ self.log.error("Unhealthy node: %s:%s" %(node_server, node['status']))
+ return nodes
+ except Exception, err:
+ traceback.print_exc()
+ sys.exit(1)
+
+ def get_bucketlist(self, server, port, user, password, opts):
+ try:
+ bucketlist = buckets.Buckets().runCmd('bucket-get', server, port, user, password, opts)
+ for bucket in bucketlist:
+ bucket_name = bucket['name']
+ self.log.info("bucket: %s" % bucket_name)
+ bucketinfo = {}
+ bucketinfo['name'] = bucket_name
+ bucketinfo['bucketType'] = bucket['bucketType']
+ bucketinfo['authType'] = bucket['authType']
+ bucketinfo['saslPassword'] = bucket['saslPassword']
+ bucketinfo['numReplica'] = bucket['replicaNumber']
+ bucketinfo['ramQuota'] = bucket['quota']['ram']
+ bucketinfo['master'] = server
+
+ bucketStats = bucket['basicStats']
+ bucketinfo['bucketStats'] = {}
+ bucketinfo['bucketStats']['diskUsed'] = bucketStats['diskUsed']
+ bucketinfo['bucketStats']['memUsed'] = bucketStats['memUsed']
+ bucketinfo['bucketStats']['diskFetches'] = bucketStats['diskFetches']
+ bucketinfo['bucketStats']['quotaPercentUsed'] = bucketStats['quotaPercentUsed']
+ bucketinfo['bucketStats']['opsPerSec'] = bucketStats['opsPerSec']
+ bucketinfo['bucketStats']['itemCount'] = bucketStats['itemCount']
+
+ stats_buffer.bucket_info[bucket_name] = bucketinfo
+
+ # get bucket related stats
+ c = buckets.BucketStats(bucket_name)
+ json = c.runCmd('bucket-stats', server, port, user, password, opts)
+ stats_buffer.buckets_summary[bucket_name] = json
+ return bucketlist
+ except Exception, err:
+ traceback.print_exc()
+ sys.exit(1)
+
+ def get_mc_stats_per_node(self, mc, stats):
+ cmd_list = ["timings", "tap", "checkpoint", "memory", ""]
+ #cmd_list = ["tap"]
+ try:
+ for cmd in cmd_list:
+ node_stats = mc.stats(cmd)
+ if node_stats:
+ if cmd == "timings":
+ # need to preprocess histogram data first
+ vals = sorted([self.seg(*kv) for kv in node_stats.items()])
+ dd = {}
+ totals = {}
+ longest = 0
+ for s in vals:
+ avg = (s[0][1] + s[0][2]) / 2
+ k = s[0][0]
+ l = dd.get(k, [])
+ l.append((avg, s[1]))
+ dd[k] = l
+ totals[k] = totals.get(k, 0) + s[1]
+ for k in sorted(dd):
+ ccount = 0
+ for lbl,v in dd[k]:
+ ccount += v * lbl
+ stats[k] = ccount / totals[k]
+ else:
+ for key, val in node_stats.items():
+ stats[key] = val
+ except Exception, err:
+ traceback.print_exc()
+
+ def get_mc_stats(self, server, bucketlist, nodes):
+ #print util.pretty_print(bucketlist)
+ for bucket in bucketlist:
+ bucket_name = bucket['name']
+ stats_buffer.node_stats[bucket_name] = {}
+ for node in nodes:
+ (node_server, node_port) = util.hostport(node['hostname'])
+ self.log.info(" node: %s %s" % (node_server, node['ports']['direct']))
+ stats = {}
+ mc = mc_bin_client.MemcachedClient(node_server, node['ports']['direct'])
+ if bucket["name"] != "Default":
+ mc.sasl_auth_plain(bucket_name.encode("utf8"), bucket["saslPassword"].encode("utf8"))
+ self.get_mc_stats_per_node(mc, stats)
+ stats_buffer.node_stats[bucket_name][node['hostname']] = stats
+
+ def get_ns_stats(self, bucketlist, server, port, user, password, opts):
+ for bucket in bucketlist:
+ bucket_name = bucket['name']
+ stats_buffer.buckets[bucket_name] = copy.deepcopy(stats_buffer.stats)
+ cmd = 'bucket-node-stats'
+ for scale, stat_set in stats_buffer.buckets[bucket_name].iteritems():
+ for stat in stat_set.iterkeys():
+ sys.stderr.write('.')
+ self.log.debug("retrieve: %s" % stat)
+ c = buckets.BucketNodeStats(bucket_name, stat, scale)
+
+ json = c.runCmd('bucket-node-stats', server, port, user, password, opts)
+ stats_buffer.buckets[bucket_name][scale][stat] = json
+ sys.stderr.write('\n')
+
+ def collect_data(self,cluster, user, password, opts):
+ server, port = util.hostport(cluster)
+
+ #get node list info
+ nodes = self.get_hostlist(server, port, user, password, opts)
+ self.log.debug(util.pretty_print(stats_buffer.nodes))
+
+ #get bucket list
+ bucketlist = self.get_bucketlist(server, port, user, password, opts)
+ self.log.debug(util.pretty_print(stats_buffer.bucket_info))
+
+ #get stats from ep-engine
+ self.get_mc_stats(server, bucketlist, nodes)
+ self.log.debug(util.pretty_print(stats_buffer.node_stats))
+
+ #get stats from ns-server
+ self.get_ns_stats(bucketlist, server, port, user, password, opts)
+ self.log.debug(util.pretty_print(stats_buffer.buckets))
+
View
171 diskqueue_stats.py
@@ -0,0 +1,171 @@
+import stats_buffer
+import util_cli as util
+
+class AvgDiskQueue:
+ def run(self, accessor):
+ result = {}
+ for bucket, stats_info in stats_buffer.buckets.iteritems():
+ #print bucket, stats_info
+ disk_queue_avg_error = []
+ disk_queue_avg_warn = []
+ values = stats_info[accessor["scale"]][accessor["counter"]]
+ nodeStats = values["nodeStats"]
+ samplesCount = values["samplesCount"]
+ for node, vals in nodeStats.iteritems():
+ avg = sum(vals) / samplesCount
+ if avg > accessor["threshold"]["high"]:
+ disk_queue_avg_error.append({"node":node, "level":"red", "value":avg})
+ elif avg > accessor["threshold"]["low"]:
+ disk_queue_avg_warn.append({"node":node, "level":"yellow", "value":avg})
+ if len(disk_queue_avg_error) > 0:
+ result[bucket] = {"error" : disk_queue_avg_error}
+ if len(disk_queue_avg_warn) > 0:
+ result[bucket] = {"warn" : disk_queue_avg_warn}
+ return result
+
+class DiskQueueTrend:
+ def run(self, accessor):
+ result = {}
+ for bucket, stats_info in stats_buffer.buckets.iteritems():
+ trend_error = []
+ trend_warn = []
+ values = stats_info[accessor["scale"]][accessor["counter"]]
+ timestamps = values["timestamp"]
+ timestamps = [x - timestamps[0] for x in timestamps]
+ nodeStats = values["nodeStats"]
+ samplesCount = values["samplesCount"]
+ for node, vals in nodeStats.iteritems():
+ a, b = util.linreg(timestamps, vals)
+ if a > accessor["threshold"]["high"]:
+ trend_error.append({"node":node, "level":"red", "value":a})
+ elif a > accessor["threshold"]["low"]:
+ trend_warn.append({"node":node, "level":"yellow", "value":a})
+ if len(trend_error) > 0:
+ result[bucket] = {"error" : trend_error}
+ if len(trend_warn) > 0:
+ result[bucket] = {"warn" : trend_warn}
+ return result
+
+class TapQueueTrend:
+ def run(self, accessor):
+ result = {}
+ for bucket, stats_info in stats_buffer.buckets.iteritems():
+ trend_error = []
+ trend_warn = []
+ values = stats_info[accessor["scale"]][accessor["counter"]]
+ timestamps = values["timestamp"]
+ timestamps = [x - timestamps[0] for x in timestamps]
+ nodeStats = values["nodeStats"]
+ samplesCount = values["samplesCount"]
+ for node, vals in nodeStats.iteritems():
+ a, b = util.linreg(timestamps, vals)
+ if a > accessor["threshold"]["high"]:
+ trend_error.append({"node":node, "level":"red", "value":a})
+ elif a > accessor["threshold"]["low"]:
+ trend_warn.append({"node":node, "level":"yellow", "value":a})
+ if len(trend_error) > 0:
+ result[bucket] = {"error" : trend_error}
+ if len(trend_warn) > 0:
+ result[bucket] = {"warn" : trend_warn}
+ return result
+
+class DiskQueueDrainingRate:
+ def run(self, accessor):
+ result = {}
+ for bucket, stats_info in stats_buffer.buckets.iteritems():
+ #print bucket, stats_info
+ disk_queue_avg_error = []
+ disk_queue_avg_warn = []
+ drain_values = stats_info[accessor["scale"]][accessor["counter"][0]]
+ len_values = stats_info[accessor["scale"]][accessor["counter"][1]]
+ nodeStats = drain_values["nodeStats"]
+ samplesCount = drain_values["samplesCount"]
+ for node, vals in nodeStats.iteritems():
+ avg = sum(vals) / samplesCount
+ disk_len_vals = len_values["nodeStats"][node]
+ len_avg = sum(disk_len_vals) / samplesCount
+ if avg < accessor["threshold"]["drainRate"] and len_avg > accessor["threshold"]["diskLength"]:
+ disk_queue_avg_error.append({"node":node, "level":"red", "value":avg})
+ if len(disk_queue_avg_error) > 0:
+ result[bucket] = {"error" : disk_queue_avg_error}
+ return result
+
+DiskQueueCapsule = [
+ {"name" : "DiskQueueDiagnosis",
+ "description" : "",
+ "ingredients" : [
+ {
+ "name" : "avgDiskQueueLength",
+ "description" : "Persistence severely behind - averge disk queue length is above threshold",
+ "counter" : "disk_write_queue",
+ "pernode" : True,
+ "scale" : "minute",
+ "code" : "AvgDiskQueue",
+ "threshold" : {
+ "low" : 50000000,
+ "high" : 1000000000
+ },
+ },
+ {
+ "name" : "diskQueueTrend",
+ "description" : "Persistence severely behind - disk write queue continues growing",
+ "counter" : "disk_write_queue",
+ "pernode" : True,
+ "scale" : "hour",
+ "code" : "DiskQueueTrend",
+ "threshold" : {
+ "low" : 0,
+ "high" : 0.25
+ },
+ },
+ ],
+ "indicator" : True,
+ },
+ {"name" : "ReplicationTrend",
+ "ingredients" : [
+ {
+ "name" : "replicationTrend",
+ "description" : "Replication severely behind - ",
+ "counter" : "ep_tap_total_total_backlog_size",
+ "pernode" : True,
+ "scale" : "hour",
+ "code" : "TapQueueTrend",
+ "threshold" : {
+ "low" : 0,
+ "high" : 0.2
+ },
+ }
+ ],
+ "indicator" : True,
+ },
+ {"name" : "DiskQueueDrainingAnalysis",
+ "description" : "",
+ "ingredients" : [
+ {
+ "name" : "activeDiskQueueDrainRate",
+ "description" : "Persistence severely behind - active disk queue draining rate is below threshold",
+ "counter" : ["vb_active_queue_drain", "disk_write_queue"],
+ "pernode" : True,
+ "scale" : "minute",
+ "code" : "DiskQueueDrainingRate",
+ "threshold" : {
+ "drainRate" : 0,
+ "diskLength" : 100000,
+ },
+ },
+ {
+ "name" : "replicaDiskQueueDrainRate",
+ "description" : "Persistence severely behind - replica disk queue draining rate is below threshold",
+ "counter" : ["vb_replica_queue_drain", "disk_write_queue"],
+ "pernode" : True,
+ "scale" : "minute",
+ "code" : "DiskQueueDrainingRate",
+ "threshold" : {
+ "drainRate" : 0,
+ "diskLength" : 100000,
+ },
+ },
+ ],
+ "indicator" : True,
+ },
+]
View
4 info.py
@@ -33,7 +33,9 @@ def runCmd(self, cmd, server, port,
for x in ['license', 'licenseValid', 'licenseValidUntil']:
if x in json:
del(json[x])
- if cmd == 'server-eshell':
+ if cmd == 'get-server-info':
+ return json
+ elif cmd == 'server-eshell':
p = subprocess.call(['erl','-name','ctl@127.0.0.1',
'-setcookie',json['otpCookie'],'-hidden','-remsh',json['otpNode']])
else:
View
4 listservers.py
@@ -34,7 +34,9 @@ def runCmd(self, cmd, server, port,
self.port,
self.user,
self.password)
- if (self.output == 'json'):
+ if (self.output == 'return'):
+ return self.getNodes(data)
+ elif (self.output == 'json'):
print data
else:
# obtain dict of nodes. If not dict, is error message
View
383 node_stats.py
@@ -0,0 +1,383 @@
+
+import stats_buffer
+import util_cli as util
+
+class NodeList:
+ def run(self, accessor):
+ result = []
+ for node, node_info in stats_buffer.nodes.iteritems():
+ result.append({"ip": node, "port": node_info['port'], "version" :node_info['version'], "os": node_info['os'], "status" :node_info['status']})
+ return result
+
+class NumNodes:
+ def run(self, accessor):
+ result = []
+ result.append(len(stats_buffer.nodes))
+ return result
+
+class NumDownNodes:
+ def run(self, accessor):
+ result = []
+ result.append(len(filter(lambda (a, b): b["status"]=="down", stats_buffer.nodes.items())))
+ return result
+
+class NumWarmupNodes:
+ def run(self, accessor):
+ result = []
+ result.append(len(filter(lambda (a, b): b["status"]=="warmup", stats_buffer.nodes.items())))
+ return result
+
+class NumFailOverNodes:
+ def run(self, accessor):
+ result = []
+ result.append(len(filter(lambda (a, b): b["clusterMembership"]!="active", stats_buffer.nodes.items())))
+ return result
+
+class BucketList:
+ def run(self, accessor):
+ result = []
+ for bucket in stats_buffer.bucket_info.keys():
+ result.append({"name": bucket})
+
+ return result
+
+class NodeStorageStats:
+ def run(self, accessor):
+ result = []
+ for node, values in stats_buffer.nodes.iteritems():
+ if values["StorageInfo"].has_key("hdd"):
+ result.append({"ip": values["host"],
+ "port": values["port"],
+ "type" : "hdd",
+ "free": util.size_label(values["StorageInfo"]["hdd"]["free"]),
+ "quotaTotal" : util.size_label(values["StorageInfo"]["hdd"]["quotaTotal"]),
+ "used" : util.size_label(values["StorageInfo"]["hdd"]["used"]),
+ "usedByData" : util.size_label(values["StorageInfo"]["hdd"]["usedByData"]),
+ "total" : util.size_label(values["StorageInfo"]["hdd"]["total"])})
+ if values["StorageInfo"].has_key("ram"):
+ result.append({"ip": values["host"],
+ "port": values["port"],
+ "type" : "ram",
+ "quotaTotal" : util.size_label(values["StorageInfo"]["ram"]["quotaTotal"]),
+ "used" : util.size_label(values["StorageInfo"]["ram"]["used"]),
+ "usedByData" : util.size_label(values["StorageInfo"]["ram"]["usedByData"]),
+ "total" : util.size_label(values["StorageInfo"]["ram"]["total"])})
+ return result
+
+class NodeSystemStats:
+ def run(self, accessor):
+ result = []
+ for node, values in stats_buffer.nodes.iteritems():
+ result.append({"ip": values["host"],
+ "port": values["port"],
+ "cpuUtilization" :util.pretty_float(values["systemStats"]["cpu_utilization_rate"]),
+ "swapTotal": util.size_label(values["systemStats"]["swap_total"]),
+ "swapUsed" : util.size_label(values["systemStats"]["swap_used"]),
+ "currentItems" : values["systemStats"]["currentItems"],
+ "currentItemsTotal" : values["systemStats"]["currentItemsTotal"],
+ "replicaCurrentItems" : values["systemStats"]["replicaCurrentItems"]})
+
+ return result
+
+class ConnectionTrend:
+ def run(self, accessor):
+ result = {}
+ for bucket, stats_info in stats_buffer.buckets.iteritems():
+ values = stats_info[accessor["scale"]][accessor["counter"]]
+ timestamps = values["timestamp"]
+ timestamps = [x - timestamps[0] for x in timestamps]
+ nodeStats = values["nodeStats"]
+ samplesCount = values["samplesCount"]
+ trend = []
+ for node, vals in nodeStats.iteritems():
+ a, b = util.linreg(timestamps, vals)
+ trend.append((node, a, vals[-1]))
+ result[bucket] = trend
+ return result
+
+class CalcTrend:
+ def run(self, accessor):
+ result = {}
+ for bucket, stats_info in stats_buffer.buckets.iteritems():
+ values = stats_info[accessor["scale"]][accessor["counter"]]
+ timestamps = values["timestamp"]
+ timestamps = [x - timestamps[0] for x in timestamps]
+ nodeStats = values["nodeStats"]
+ samplesCount = values["samplesCount"]
+ trend = []
+ for node, vals in nodeStats.iteritems():
+ a, b = util.linreg(timestamps, vals)
+ trend.append((node, a))
+ result[bucket] = trend
+ return result
+
+class NodePerformanceStats:
+ def run(self, accessor):
+ result = {}
+ for bucket, bucket_stats in stats_buffer.node_stats.iteritems():
+ stats = []
+ for node, stats_info in bucket_stats.iteritems():
+
+ for key, value in stats_info.iteritems():
+ if key.find(accessor["counter"]) >= 0:
+ if accessor.has_key("threshold"):
+ if int(value) > accessor["threshold"]:
+ stats.append((node, (key, value)))
+ else:
+ if accessor.has_key("unit"):
+ if accessor["unit"] == "time":
+ stats.append((node, util.time_label(value)))
+ elif accessor["unit"] == "size":
+ stats.append((node, util.size_label(int(value))))
+ else:
+ stats.append((node, (key,value)))
+
+ result[bucket] = stats
+ return result
+
+NodeCapsule = [
+ {"name" : "NodeStatus",
+ "ingredients" : [
+ {
+ "name" : "nodeList",
+ "description" : "Node list",
+ "code" : "NodeList",
+ },
+ {
+ "name" : "numNodes",
+ "description" : "Number of Nodes",
+ "code" : "NumNodes",
+ },
+ {
+ "name" : "numDownNodes",
+ "description" : "Number of Down Nodes",
+ "code" : "NumDownNodes",
+ },
+ {
+ "name" : "numWarmupNodes",
+ "description" : "Number of Warmup Nodes",
+ "code" : "NumWarmupNodes",
+ },
+ {
+ "name" : "numFailedOverNodes",
+ "description" : "Number of Nodes failed over",
+ "code" : "NumFailOverNodes",
+ },
+ ],
+ "clusterwise" : False,
+ "nodewise" : True,
+ "perNode" : False,
+ "perBucket" : False,
+ },
+ {"name" : "NumberOfConnection",
+ "ingredients" : [
+ {
+ "name" : "connectionTrend",
+ "description" : "Connection Trend",
+ "counter" : "curr_connections",
+ "scale" : "minute",
+ "code" : "ConnectionTrend",
+ "threshold" : {
+ "high" : 10000,
+ },
+ },
+ ],
+ "nodewise" : True,
+ "perNode" : True,
+ },
+ {"name" : "OOMError",
+ "ingredients" : [
+ {
+ "name" : "oomErrors",
+ "description" : "OOM Errors",
+ "counter" : "ep_oom_errors",
+ "scale" : "hour",
+ "code" : "CalcTrend",
+ },
+ {
+ "name" : "tempOomErrors",
+ "description" : "Temporary OOM Errors",
+ "counter" : "ep_tmp_oom_errors",
+ "scale" : "hour",
+ "code" : "CalcTrend",
+ },
+ ]
+ },
+ {"name" : "bucketList",
+ "ingredients" : [
+ {
+ "name" : "bucketList",
+ "description" : "Bucket list",
+ "code" : "BucketList",
+ },
+ ],
+ "nodewise" : True,
+ },
+ {"name" : "nodeStorageStats",
+ "ingredients" : [
+ {
+ "name" : "nodeStorageStats",
+ "description" : "Node storage stats",
+ "code" : "NodeStorageStats",
+ },
+ ],
+ "nodewise" : True,
+ },
+ {"name" : "nodeSystemStats",
+ "ingredients" : [
+ {
+ "name" : "nodeSystemStats",
+ "description" : "Node system stats",
+ "code" : "NodeSystemStats",
+ },
+ ],
+ "nodewise" : True,
+ },
+ {"name" : "tapPerformance",
+ "ingredients" : [
+ {
+ "name" : "backfillRemaining",
+ "description" : "Number of backfill remaining",
+ "counter" : "ep_tap_queue_backfillremaining",
+ "code" : "NodePerformanceStats",
+ },
+ {
+ "name" : "tapNack",
+ "description" : "Number of nacks",
+ "counter" : "num_tap_nack",
+ "code" : "NodePerformanceStats",
+ },
+ {
+ "name" : "tapIdle",
+ "description" : "Idle tap streams",
+ "counter" : "idle",
+ "code" : "NodePerformanceStats",
+ },
+ ],
+ "perBucket" : True,
+ },
+ {"name" : "checkpointPerformance",
+ "ingredients" : [
+ {
+ "name" : "openCheckPoint",
+ "description" : "Items for open checkpoints",
+ "counter" : "num_checkpoint_items",
+ "code" : "NodePerformanceStats",
+ "threshold" : 10000,
+ },
+ ],
+ "perBucket" : True,
+ },
+ {"name" : "diskPerformance",
+ "ingredients" : [
+ {
+ "name" : "diskCommit",
+ "description" : "Averge disk commit time",
+ "counter" : "disk_commit",
+ "code" : "NodePerformanceStats",
+ "unit" : "time",
+ },
+ {
+ "name" : "diskUpdate",
+ "description" : "Averge disk update time",
+ "counter" : "disk_update",
+ "code" : "NodePerformanceStats",
+ "unit" : "time",
+ },
+ {
+ "name" : "diskInsert",
+ "description" : "Averge disk insert time",
+ "counter" : "disk_insert",
+ "code" : "NodePerformanceStats",
+ "unit" : "time",
+ },
+ {
+ "name" : "diskDelete",
+ "description" : "Averge disk delete time",
+ "counter" : "disk_del",
+ "code" : "NodePerformanceStats",
+ "unit" : "time",
+ },
+ ],
+ "perBucket" : True,
+ },
+ {"name" : "AverageDocumentSize",
+ "ingredients" : [
+ {
+ "name" : "averageDocumentSize",
+ "description" : "Average Document Size",
+ "counter" : "item_alloc_sizes",
+ "code" : "NodePerformanceStats",
+ "unit" : "size",
+ },
+ ],
+ "perBucket" : True,
+ },
+ {"name" : "MemoryUsage",
+ "ingredients" : [
+ {
+ "name" : "totalMemoryUsage",
+ "description" : "Total memory usage",
+ "counter" : "total_heap_bytes",
+ "code" : "NodePerformanceStats",
+ "unit" : "size",
+ },
+ {
+ "name" : "totalFragmentation",
+ "description" : "Total memory fragmentation",
+ "counter" : "total_fragmentation_bytes",
+ "code" : "NodePerformanceStats",
+ "unit" : "size",
+ },
+ {
+ "name" : "totalInternalMemory",
+ "description" : "Total internal memory usage",
+ "counter" : "mem_used",
+ "code" : "NodePerformanceStats",
+ "unit" : "size",
+ },
+ {
+ "name" : "overhead",
+ "description" : "Memory overhead",
+ "counter" : "ep_overhead",
+ "scale" : "hour",
+ "code" : "NodePerformanceStats",
+ "unit" : "size",
+ },
+ ],
+ "perBucket" : True,
+ },
+ {"name" : "EPEnginePerformance",
+ "ingredients" : [
+ {
+ "name" : "flusherState",
+ "description" : "Engine flusher state",
+ "counter" : "ep_flusher_state",
+ "code" : "NodePerformanceStats",
+ },
+ {
+ "name" : "flusherCompleted",
+ "description" : "Flusher completed",
+ "counter" : "ep_flusher_num_completed",
+ "code" : "NodePerformanceStats",
+ },
+ {
+ "name" : "avgItemLoadTime",
+ "description" : "Average item loaded time",
+ "counter" : "ep_bg_load_avg",
+ "code" : "NodePerformanceStats",
+ "unit" : "time"
+ },
+ {
+ "name" : "avgItemWaitTime",
+ "description" : "Averge item waited time",
+ "counter" : "ep_bg_wait_avg",
+ "code" : "NodePerformanceStats",
+ "unit" : "time",
+ },
+ ],
+ "perNode" : True,
+ },
+]
+
+
View
97 processor.py
@@ -0,0 +1,97 @@
+import datetime
+import logging
+import util_cli as util
+import sys
+
+import cluster_stats
+import diskqueue_stats
+import node_stats
+
+import stats_buffer
+
+capsules = [
+ (node_stats.NodeCapsule, "node_stats"),
+ (cluster_stats.ClusterCapsule, "cluster_stats"),
+ (diskqueue_stats.DiskQueueCapsule, "diskqueue_stats"),
+]
+
+node_list = {}
+bucket_list = []
+cluster_symptoms = {}
+bucket_symptoms = {}
+bucket_node_symptoms = {}
+node_symptoms = {}
+indicator_error = {}
+indicator_warn = {}
+node_disparate = {}
+
+class StatsAnalyzer:
+ def __init__(self, log):
+ self.log = log
+
+ def run_analysis(self):
+
+ for bucket in stats_buffer.buckets.iterkeys():
+ bucket_list.append(bucket)
+ bucket_symptoms[bucket] = []
+ bucket_node_symptoms[bucket] = {}
+
+ for capsule, package_name in capsules:
+ for pill in capsule:
+ self.log.debug(pill['name'])
+ for counter in pill['ingredients']:
+ result = eval("{0}.{1}().run(counter)".format(package_name, counter['code']))
+
+ self.log.debug(counter)
+ if pill.has_key("clusterwise") and pill["clusterwise"] :
+ if isinstance(result, dict):
+ if result.has_key("cluster"):
+ cluster_symptoms[counter["name"]] = {"description" : counter["description"], "value":result["cluster"]}
+ else:
+ cluster_symptoms[counter["name"]] = {"description" : counter["description"], "value":result}
+ else:
+ cluster_symptoms[counter["name"]] = {"description" : counter["description"], "value":result}
+
+ if pill.has_key("perBucket") and pill["perBucket"] :
+ #bucket_symptoms[counter["name"]] = {"description" : counter["description"], "value":result}
+ for bucket, values in result.iteritems():
+ if bucket == "cluster":
+ continue
+ for val in values:
+ if val[0] == "variance":
+ continue
+ elif val[0] == "total":
+ bucket_symptoms[bucket].append({"description" : counter["description"], "value" : values[-1][1]})
+ else:
+ if bucket_node_symptoms[bucket].has_key(val[0]) == False:
+ bucket_node_symptoms[bucket][val[0]] = []
+ bucket_node_symptoms[bucket][val[0]].append({"description" : counter["description"], "value" : val[1]})
+
+ if pill.has_key("perNode") and pill["perNode"] :
+ node_symptoms[counter["name"]] = {"description" : counter["description"], "value":result}
+ if pill.has_key("nodewise") and pill["nodewise"]:
+ node_list[counter["name"]] = {"description" : counter["description"], "value":result}
+
+ def run_report(self, txtfile):
+ dict = {
+ "globals" : globals,
+ "cluster_symptoms" : cluster_symptoms,
+ "bucket_symptoms" : bucket_symptoms,
+ "bucket_node_symptoms" : bucket_node_symptoms,
+ "node_symptoms" : node_symptoms,
+ "node_list" : node_list,
+ "bucket_list" : bucket_list,
+ }
+
+ f = open(txtfile, 'w')
+ report = {}
+ report["Report Time"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ report["Nodelist Overview"] = node_list
+ report["Cluster Overview"] = cluster_symptoms
+ report["Node Metrics"] = node_symptoms
+ report["Bucket Metrics"] = bucket_symptoms
+ report["Bucket Node Metrics"] = bucket_node_symptoms
+
+ print >> f, util.pretty_print(report)
+ f.close()
+ sys.stderr.write("The run finished successfully. Please find output result at '{0}'".format(txtfile))
View
35 stats_buffer.py
@@ -0,0 +1,35 @@
+nodes = {}
+node_stats = {}
+
+buckets_summary = {}
+stats_summary = {}
+
+bucket_info = {}
+buckets = {}
+stats = {
+ "minute" : {
+ 'disk_write_queue' : {},
+ 'cmd_get' : {},
+ 'cmd_set' : {},
+ 'delete_hits' : {},
+ 'curr_items' : {},
+ 'vb_replica_curr_items' : {},
+ 'curr_connections' : {},
+ 'vb_active_queue_drain' : {},
+ 'vb_replica_queue_drain' : {},
+ 'disk_write_queue' : {},
+ },
+ "hour" : {
+ 'disk_write_queue' : {},
+ 'ep_cache_miss_rate' : {},
+ 'ep_tap_total_total_backlog_size' : { },
+ 'ep_oom_errors' : {},
+ 'ep_tmp_oom_errors' : {},
+ 'vb_active_num' : {},
+ 'vb_replica_num' : {},
+ "mem_used" : {},
+ },
+ "day" : {
+ 'curr_items' : {},
+ },
+}
View
87 util_cli.py
@@ -1,6 +1,13 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
+import json
+import math
+import itertools
+
+BIG_VALUE = 2 ** 60
+SMALL_VALUE = - (2 ** 60)
+
def hostport(hoststring, default_port=8091):
""" finds the host and port given a host:port string """
try:
@@ -11,3 +18,83 @@ def hostport(hoststring, default_port=8091):
port = default_port
return (host, port)
+
+def time_label(s):
+ # -(2**64) -> '-inf'
+ # 2**64 -> 'inf'
+ # 0 -> '0'
+ # 4 -> '4us'
+ # 838384 -> '838ms'
+ # 8283852 -> '8s'
+ if s > BIG_VALUE:
+ return 'inf'
+ elif s < SMALL_VALUE:
+ return '-inf'
+ elif s == 0:
+ return '0'
+ product = 1
+ sizes = (('us', 1), ('ms', 1000), ('s', 1000), ('m', 60))
+ sizeMap = []
+ for l,sz in sizes:
+ product = sz * product
+ sizeMap.insert(0, (l, product))
+ lbl, factor = itertools.dropwhile(lambda x: x[1] > s, sizeMap).next()
+ return "%d %s" % (s / factor, lbl)
+
+def size_label(s):
+ if type(s) in (int, long, float, complex) :
+ if s == 0:
+ return "0"
+ sizes=['', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB']
+ e = math.floor(math.log(abs(s), 1024))
+ suffix = sizes[int(e)]
+ return "%d %s" % (s/(1024 ** math.floor(e)), suffix)
+ else:
+ return s
+
+def linreg(X, Y):
+ """
+ Summary
+ Linear regression of y = ax + b
+ Usage
+ real, real, real = linreg(list, list)
+ Returns coefficients to the regression line "y=ax+b" from x[] and y[], and R^2 Value
+ """
+ if len(X) != len(Y): raise ValueError, 'unequal length'
+ N = len(X)
+ Sx = Sy = Sxx = Syy = Sxy = 0.0
+ for x, y in map(None, X, Y):
+ Sx = Sx + x
+ Sy = Sy + y
+ Sxx = Sxx + x*x
+ Syy = Syy + y*y
+ Sxy = Sxy + x*y
+ det = Sxx * N - Sx * Sx
+ if det == 0:
+ return 0, 0
+ else:
+ a, b = (Sxy * N - Sy * Sx)/det, (Sxx * Sy - Sx * Sxy)/det
+ return a, b
+
+def two_pass_variance(data):
+ n = 0
+ sum1 = 0
+ sum2 = 0
+
+ for x in data:
+ n = n + 1
+ sum1 = sum1 + x
+
+ mean = sum1/n
+ for x in data:
+ sum2 = sum2 + (x - mean)*(x - mean)
+ if n <= 1:
+ return 0
+ variance = sum2/(n - 1)
+ return variance
+
+def pretty_float(number, precision=2):
+ return '%.*f' % (precision, number)
+
+def pretty_print(obj):
+ return json.dumps(obj, indent=4, sort_keys=True)

0 comments on commit 40bd0f4

Please sign in to comment.
Something went wrong with that request. Please try again.