Skip to content
Browse files

integrate with template engine

  • Loading branch information...
1 parent a83a553 commit 10a0fca1c173f0bee09505cfb7914fad0bf81208 @bcui6611 committed
Showing with 270 additions and 115 deletions.
  1. +88 −24 analyzer.py
  2. +13 −2 bucket_stats.py
  3. +32 −56 cluster_stats.py
  4. +1 −1 dbaccessor.py
  5. +52 −28 diskqueue_stats.py
  6. +1 −1 healthChecker.py
  7. +83 −3 node_stats.py
View
112 analyzer.py
@@ -1,3 +1,4 @@
+import datetime
import dbaccessor
import util
@@ -6,16 +7,31 @@
import diskqueue_stats
import node_stats
+import stats_buffer
+
+from Cheetah.Template import Template
+
capsules = [
+ (node_stats.NodeCapsule, "node_stats"),
(cluster_stats.ClusterCapsule, "cluster_stats"),
#(bucket_stats.BucketCapsule, "bucket_stats"),
(diskqueue_stats.DiskQueueCapsule, "diskqueue_stats"),
- (node_stats.NodeCapsule, "node_stats"),
]
-cluster_symptoms = []
-bucket_symptoms = []
-node_symptoms = []
+globals = {
+ "versions" : "1.0",
+ "report_time" : datetime.datetime.now(),
+ "cluster_health" : "ok",
+}
+
+node_list = {}
+bucket_list = []
+cluster_symptoms = {}
+bucket_symptoms = {}
+bucket_node_symptoms = {}
+node_symptoms = {}
+indicator_error = {}
+indicator_warn = {}
def format_output(counter, result):
if len(result) == 1:
@@ -32,7 +48,12 @@ def __init__(self):
def run_analysis(self):
self.accessor.connect_db()
- #self.accessor.browse_db()
+ self.accessor.browse_db()
+
+ for bucket in stats_buffer.buckets.iterkeys():
+ bucket_list.append(bucket)
+ bucket_symptoms[bucket] = []
+ bucket_node_symptoms[bucket] = {}
for capsule, package_name in capsules:
for pill in capsule:
@@ -45,38 +66,81 @@ def run_analysis(self):
elif counter['type'] == 'python':
result = eval("{0}.{1}().run(counter)".format(package_name, counter['code']))
- if counter.has_key("unit") and counter["unit"] == "GB":
- util.pretty_print({counter["description"] : result})
- else:
- util.pretty_print({counter["description"] : result})
+ #if counter.has_key("unit") and counter["unit"] == "GB":
+ # util.pretty_print({counter["description"] : result})
+ #else:
+ # util.pretty_print({counter["description"] : result})
#print counter
if pill.has_key("clusterwise") and pill["clusterwise"] :
if isinstance(result, dict):
if result.has_key("cluster"):
- cluster_symptoms.append({counter["description"] : result["cluster"]})
+ if counter.has_key("unit") and counter["unit"] == "GB":
+ cluster_symptoms[counter["name"]] = {"description" : counter["description"], "value": util.humanize_bytes(result["cluster"])}
+ else:
+ cluster_symptoms[counter["name"]] = {"description" : counter["description"], "value":result["cluster"]}
else:
- cluster_symptoms.append({counter["description"] : result})
+ cluster_symptoms[counter["name"]] = {"description" : counter["description"], "value":result}
else:
- cluster_symptoms.append({counter["description"] : result})
+ cluster_symptoms[counter["name"]] = {"description" : counter["description"], "value":result}
if pill.has_key("perBucket") and pill["perBucket"] :
- bucket_symptoms.append({counter["description"] :result})
+ #bucket_symptoms[counter["name"]] = {"description" : counter["description"], "value":result}
+ for bucket, values in result.iteritems():
+ if bucket == "cluster":
+ continue
+ if values[-1][0] == "total":
+ bucket_symptoms[bucket].append({"description" : counter["description"], "value" : values[-1][1]})
+ for val in values[:-1]:
+ if bucket_node_symptoms[bucket].has_key(val[0]) == False:
+ bucket_node_symptoms[bucket][val[0]] = []
+ bucket_node_symptoms[bucket][val[0]].append({"description" : counter["description"], "value" : val[1]})
+
if pill.has_key("perNode") and pill["perNode"] :
- node_symptoms.append({counter["description"] :result})
-
+ node_symptoms[counter["name"]] = {"description" : counter["description"], "value":result}
+ if pill.has_key("nodewise") and pill["nodewise"]:
+ node_list[counter["name"]] = {"description" : counter["description"], "value":result}
+
+ if pill.has_key("indicator") and pill["indicator"] :
+ if len(result) > 0:
+ for bucket,values in result.iteritems():
+ if values.has_key("error"):
+ indicator_error[counter["name"]] = {"description" : counter["description"], "bucket": bucket, "value":values["error"]}
+ if values.has_key("warn"):
+ indicator_warn[counter["name"]] = {"description" : counter["description"], "bucket": bucket, "value":values["warn"]}
+
self.accessor.close()
self.accessor.remove_db()
def run_report(self):
- print "Cluster Overview"
- for symptom in cluster_symptoms:
- util.pretty_print(symptom)
+ dict = {
+ "globals" : globals,
+ "cluster_symptoms" : cluster_symptoms,
+ "bucket_symptoms" : bucket_symptoms,
+ "bucket_node_symptoms" : bucket_node_symptoms,
+ "node_symptoms" : node_symptoms,
+ "node_list" : node_list,
+ "bucket_list" : bucket_list,
+ "indicator_warn" : indicator_warn,
+ "indicator_error" : indicator_error,
+ }
- print "Bucket Metrics"
- for symptom in bucket_symptoms:
- util.pretty_print(symptom)
+ debug = True
+ if debug:
+ print "Nodelist Overview"
+ util.pretty_print(node_list)
- print "Node Metrics"
- for symptom in node_symptoms:
- util.pretty_print(symptom)
+ print "Cluster Overview"
+ util.pretty_print(cluster_symptoms)
+
+ print "Bucket Metrics"
+ util.pretty_print(bucket_symptoms)
+
+ print "Bucket Node Metrics"
+ util.pretty_print(bucket_node_symptoms)
+
+ print "Key indicators"
+ util.pretty_print(indicator_error)
+ util.pretty_print(indicator_warn)
+
+ #print Template(file="report-htm.tmpl", searchList=[dict])
View
15 bucket_stats.py
@@ -1,7 +1,7 @@
import dbaccessor
import stats_buffer
import util
-
+
class OpsRatio:
def run(self, accessor):
ops_avg = {
@@ -105,7 +105,18 @@ def run(self, accessor):
return trend
BucketCapsule = [
- {"name" : "Cache Miss Ratio",
+ {"name" : "bucketList",
+ "ingredients" : [
+ {
+ "name" : "bucketList",
+ "description" : "Bucket list",
+ "type" : "pythonSQL",
+ "code" : "BucketList",
+ },
+ ],
+ "perBucket" : True,
+ },
+ {"name" : "CacheMissRatio",
"ingredients" : [
{
"description" : "Cache miss ratio",
View
88 cluster_stats.py
@@ -7,10 +7,6 @@ def run(self, accessor, stmt):
result = accessor.execute(stmt)
return result[0]
-class NodeList:
- def run(self, accessor):
- return accessor.execute("SELECT host, port, version, os, status FROM ServerNode", True)
-
class DGMRatio:
def run(self, accessor):
hdd = accessor.execute("SELECT sum(usedbyData) FROM StorageInfo WHERE type='hdd'")
@@ -147,52 +143,18 @@ def run(self, accessor):
class NumVbuckt:
def run(self, accessor):
- trend = []
+ result = {}
for bucket, stats_info in stats_buffer.buckets_summary.iteritems():
total, values = stats_buffer.retrieveSummaryStats(bucket, accessor["counter"])
- trend.append((bucket, values[-1]))
- return trend
+ if values[-1] < accessor["threshold"]:
+ result[bucket] = values[-1]
+ return result
ClusterCapsule = [
- {"name" : "Node Status",
- "ingredients" : [
- {
- "description" : "Node list",
- "type" : "pythonSQL",
- "code" : "NodeList",
- },
- {
- "description" : "Number of Nodes",
- "type" : "SQL",
- "stmt" : "SELECT count(*) FROM ServerNode",
- "code" : "ExecSQL",
- },
- {
- "description" : "Number of Down Nodes",
- "type" : "SQL",
- "stmt" : "SELECT count(*) FROM ServerNode WHERE status='down'",
- "code" : "ExecSQL",
- },
- {
- "description" : "Number of Warmup Nodes",
- "type" : "SQL",
- "stmt" : "SELECT count(*) FROM ServerNode WHERE status='warmup'",
- "code" : "ExecSQL",
- },
- {
- "description" : "Number of Nodes failed over",
- "type" : "SQL",
- "stmt" : "SELECT count(*) FROM ServerNode WHERE clusterMembership != 'active'",
- "code" : "ExecSQL",
- },
- ],
- "clusterwise" : True,
- "perNode" : False,
- "perBucket" : False,
- },
- {"name" : "Total Data Size",
+ {"name" : "TotalDataSize",
"ingredients" : [
{
+ "name" : "totalDataSize",
"description" : "Total Data Size across cluster",
"type" : "SQL",
"stmt" : "SELECT sum(usedbyData) FROM StorageInfo WHERE type='hdd'",
@@ -204,9 +166,10 @@ def run(self, accessor):
"perNode" : False,
"perBucket" : False,
},
- {"name" : "Available disk space",
+ {"name" : "AvailableDiskSpace",
"ingredients" : [
{
+ "name" : "availableDiskSpace",
"description" : "Available disk space",
"type" : "SQL",
"stmt" : "SELECT sum(free) FROM StorageInfo WHERE type='hdd'",
@@ -218,24 +181,28 @@ def run(self, accessor):
"perNode" : False,
"perBucket" : False,
},
- {"name" : "Cache Miss Ratio",
+ {"name" : "CacheMissRatio",
"ingredients" : [
{
+ "name" : "cacheMissRatio",
"description" : "Cache miss ratio",
"counter" : "ep_cache_miss_rate",
"type" : "python",
"scale" : "hour",
"code" : "CacheMissRatio",
"unit" : "percentage",
+ "threshold" : 2,
},
],
"clusterwise" : True,
"perNode" : True,
"perBucket" : True,
+ "indicator" : False,
},
{"name" : "DGM",
"ingredients" : [
{
+ "name" : "dgm",
"description" : "Disk to Memory Ratio",
"type" : "pythonSQL",
"code" : "DGMRatio"
@@ -245,9 +212,10 @@ def run(self, accessor):
"perNode" : False,
"perBucket" : False,
},
- {"name" : "Active / Replica Resident Ratio",
+ {"name" : "ActiveReplicaResidentRatio",
"ingredients" : [
{
+ "name" : "activeReplicaResidencyRatio",
"description" : "Active and Replica Residentcy Ratio",
"type" : "python",
"counter" : ["curr_items", "vb_replica_curr_items"],
@@ -259,9 +227,10 @@ def run(self, accessor):
"perNode" : True,
"perBucket" : True,
},
- {"name" : "OPS performance",
+ {"name" : "OPSPerformance",
"ingredients" : [
{
+ "name" : "opsPerformance",
"description" : "Read/Write/Delete ops ratio",
"type" : "python",
"scale" : "minute",
@@ -270,9 +239,10 @@ def run(self, accessor):
},
]
},
- {"name" : "Growth Rate",
+ {"name" : "GrowthRate",
"ingredients" : [
{
+ "name" : "dataGrowthRateForItems",
"description" : "Data Growth rate for items",
"counter" : "curr_items",
"type" : "python",
@@ -282,9 +252,10 @@ def run(self, accessor):
},
]
},
- {"name" : "Average Document Size",
+ {"name" : "AverageDocumentSize",
"ingredients" : [
{
+ "name" : "averageDocumentSize",
"description" : "Average Document Size",
"type" : "python",
"code" : "AvgItemSize",
@@ -292,23 +263,28 @@ def run(self, accessor):
},
]
},
- {"name" : "VBucket number",
+ {"name" : "VBucketNumber",
"ingredients" : [
{
- "description" : "Active VBucket number",
+ "name" : "activeVbucketNumber",
+ "description" : "Active VBucket number is less than expected",
"counter" : "vb_active_num",
"type" : "python",
"scale" : "summary",
- "code" : "NumVbuckt"
+ "code" : "NumVbuckt",
+ "threshold" : 1024,
},
{
- "description" : "Replica VBucket number",
+ "name" : "replicaVBucketNumber",
+ "description" : "Replica VBucket number is less than expected",
"counter" : "vb_replica_num",
"type" : "python",
"scale" : "summary",
- "code" : "NumVbuckt"
+ "code" : "NumVbuckt",
+ "threshold" : 1024,
},
- ]
+ ],
+ "indicator" : True,
},
]
View
2 dbaccessor.py
@@ -152,7 +152,7 @@ def process_node_stats(self, nodeId, nodeInfo):
VALUES('{0}', {1}, {2}, {3}, {4}, {5}, {6})"""
if nodeInfo['storageTotals'] is not None:
- print nodeInfo
+ #print nodeInfo
hdd = nodeInfo['storageTotals']['hdd']
if hdd is not None:
self.cursor.execute(sqlstmt.format('hdd',
View
80 diskqueue_stats.py
@@ -8,32 +8,29 @@ def run(self, accessor):
result = {}
for bucket, stats_info in stats_buffer.buckets.iteritems():
#print bucket, stats_info
- disk_queue_avg = []
+ disk_queue_avg_error = []
+ dsik_queue_avg_warn = []
values = stats_info[accessor["scale"]][accessor["counter"]]
nodeStats = values["nodeStats"]
samplesCount = values["samplesCount"]
for node, vals in nodeStats.iteritems():
avg = sum(vals) / samplesCount
- disk_queue_avg.append((node, avg))
- result[bucket] = disk_queue_avg
+ if avg > accessor["threshold"]["high"]:
+ disk_queue_avg_error.append({"node":node, "level":"red", "value":avg})
+ elif avg > accessor["threshold"]["low"]:
+ dsik_queue_avg_warn.append({"node":node, "level":"yellow", "value":avg})
+ if len(disk_queue_avg_error) > 0:
+ result[bucket] = {"error" : disk_queue_avg_error}
+ if len(dsik_queue_avg_warn) > 0:
+ result[bucket] = {"warn" : disk_queue_avg_warn}
return result
- def action(self, values, thresholds):
- flags = []
- for bucket, node, avg in values:
- if avg < thresholds["low"]:
- flags.append((bucket, node, "green"))
- elif avg >= thresholds["low"] and avg < thresholds["high"]:
- flags.append((bucket, node, "yellow"))
- else:
- flags.append((bucket, node, "red"))
- return flags
-
class DiskQueueTrend:
def run(self, accessor):
result = {}
for bucket, stats_info in stats_buffer.buckets.iteritems():
- trend = []
+ trend_error = []
+ trend_warn = []
values = stats_info[accessor["scale"]][accessor["counter"]]
timestamps = values["timestamp"]
timestamps = [x - timestamps[0] for x in timestamps]
@@ -41,15 +38,22 @@ def run(self, accessor):
samplesCount = values["samplesCount"]
for node, vals in nodeStats.iteritems():
a, b = util.linreg(timestamps, vals)
- trend.append((node, a))
- result[bucket] = trend
+ if a > accessor["threshold"]["high"]:
+ trend_error.append({"node":node, "level":"red", "value":a})
+ elif a > accessor["threshold"]["low"]:
+ trend_warn.append({"node":node, "level":"yellow", "value":a})
+ if len(trend_error) > 0:
+ result[bucket] = {"error" : trend_error}
+ if len(trend_warn) > 0:
+ result[bucket] = {"warn" : trend_warn}
return result
class TapQueueTrend:
def run(self, accessor):
result = {}
for bucket, stats_info in stats_buffer.buckets.iteritems():
- trend = []
+ trend_error = []
+ trend_warn = []
values = stats_info[accessor["scale"]][accessor["counter"]]
timestamps = values["timestamp"]
timestamps = [x - timestamps[0] for x in timestamps]
@@ -57,15 +61,23 @@ def run(self, accessor):
samplesCount = values["samplesCount"]
for node, vals in nodeStats.iteritems():
a, b = util.linreg(timestamps, vals)
- trend.append((node, a))
- result[bucket] = trend
+ if a > accessor["threshold"]["high"]:
+ trend_error.append({"node":node, "level":"red", "value":a})
+ elif a > accessor["threshold"]["low"]:
+ trend_warn.append({"node":node, "level":"yellow", "value":a})
+ if len(trend_error) > 0:
+ result[bucket] = {"error" : trend_error}
+ if len(trend_warn) > 0:
+ result[bucket] = {"warn" : trend_warn}
return result
DiskQueueCapsule = [
- {"name" : "Disk Queue Diagnosis",
+ {"name" : "DiskQueueDiagnosis",
+ "description" : "",
"ingredients" : [
{
- "description" : "Avg Disk queue length",
+ "name" : "avgDiskQueueLength",
+ "description" : "Persistence severely behind - averge disk queue length is above threshold",
"counter" : "disk_write_queue",
"pernode" : True,
"scale" : "minute",
@@ -75,27 +87,39 @@ def run(self, accessor):
"low" : 50000000,
"high" : 1000000000
},
- },
+ },
{
- "description" : "Disk queue trend",
+ "name" : "diskQueueTrend",
+ "description" : "Persistence severely behind - disk write queue continues growing",
"counter" : "disk_write_queue",
"pernode" : True,
"scale" : "hour",
"type" : "python",
"code" : "DiskQueueTrend",
+ "threshold" : {
+ "low" : 0,
+ "high" : 0.25
+ },
},
- ]
+ ],
+ "indicator" : True,
},
- {"name" : "Replication trend",
+ {"name" : "ReplicationTrend",
"ingredients" : [
{
- "description" : "Replication Trend",
+ "name" : "replicationTrend",
+ "description" : "Replication severely behind - ",
"counter" : "ep_tap_total_total_backlog_size",
"pernode" : True,
"scale" : "hour",
"type" : "python",
"code" : "TapQueueTrend",
+ "threshold" : {
+ "low" : 0,
+ "high" : 0.2
+ },
}
- ]
+ ],
+ "indicator" : True,
},
]
View
2 healthChecker.py
@@ -146,7 +146,7 @@ def collect_data():
cmd = 'bucket-node-stats'
for scale, stat_set in stats_buffer.buckets[bucket_name].iteritems():
for stat in stat_set.iterkeys():
- print "retieving: ", stat, " scale:", scale
+ #print "retieving: ", stat, " scale:", scale
c = buckets.BucketNodeStats(bucket_name, stat, scale)
json = c.runCmd(cmd, server, port, user, password, opts)
stats_buffer.buckets[bucket_name][scale][stat] = json
View
86 node_stats.py
@@ -2,6 +2,29 @@
import stats_buffer
import util
+class ExecSQL:
+ def run(self, accessor, stmt):
+ result = accessor.execute(stmt)
+ return result[0]
+
+class NodeList:
+ def run(self, accessor):
+ result = []
+ nodelist = accessor.execute("SELECT host, port, version, os, status FROM ServerNode", True)
+ for node in nodelist:
+ result.append({"ip": node[0], "port": node[1], "version" :node[2], "os": node[3], "status" : node[4]})
+
+ return result
+
+class BucketList:
+ def run(self, accessor):
+ result = []
+ bucketlist = accessor.execute("SELECT name FROM Bucket", True)
+ for bucket in bucketlist:
+ result.append({"name": bucket[0]})
+
+ return result
+
class ConnectionTrend:
def run(self, accessor):
result = {}
@@ -35,9 +58,52 @@ def run(self, accessor):
return result
NodeCapsule = [
- {"name" : "Number of Connection",
+ {"name" : "NodeStatus",
+ "ingredients" : [
+ {
+ "name" : "nodeList",
+ "description" : "Node list",
+ "type" : "pythonSQL",
+ "code" : "NodeList",
+ },
+ {
+ "name" : "numNodes",
+ "description" : "Number of Nodes",
+ "type" : "SQL",
+ "stmt" : "SELECT count(*) FROM ServerNode",
+ "code" : "ExecSQL",
+ },
+ {
+ "name" : "numDownNodes",
+ "description" : "Number of Down Nodes",
+ "type" : "SQL",
+ "stmt" : "SELECT count(*) FROM ServerNode WHERE status='down'",
+ "code" : "ExecSQL",
+ },
+ {
+ "name" : "numWarmupNodes",
+ "description" : "Number of Warmup Nodes",
+ "type" : "SQL",
+ "stmt" : "SELECT count(*) FROM ServerNode WHERE status='warmup'",
+ "code" : "ExecSQL",
+ },
+ {
+ "name" : "numFailedOverNodes",
+ "description" : "Number of Nodes failed over",
+ "type" : "SQL",
+ "stmt" : "SELECT count(*) FROM ServerNode WHERE clusterMembership != 'active'",
+ "code" : "ExecSQL",
+ },
+ ],
+ "clusterwise" : False,
+ "nodewise" : True,
+ "perNode" : False,
+ "perBucket" : False,
+ },
+ {"name" : "NumberOfConnection",
"ingredients" : [
{
+ "name" : "connectionTrend",
"description" : "Connection Trend",
"counter" : "curr_connections",
"type" : "python",
@@ -49,9 +115,10 @@ def run(self, accessor):
},
]
},
- {"name" : "OOM Error",
+ {"name" : "OOMError",
"ingredients" : [
{
+ "name" : "oomErrors",
"description" : "OOM Errors",
"counter" : "ep_oom_errors",
"type" : "python",
@@ -59,6 +126,7 @@ def run(self, accessor):
"code" : "CalcTrend",
},
{
+ "name" : "tempOomErrors",
"description" : "Temporary OOM Errors",
"counter" : "ep_tmp_oom_errors",
"type" : "python",
@@ -70,6 +138,7 @@ def run(self, accessor):
{"name" : "Overhead",
"ingredients" : [
{
+ "name" : "overhead",
"description" : "Overhead",
"counter" : "ep_overhead",
"type" : "python",
@@ -77,7 +146,18 @@ def run(self, accessor):
"code" : "CalcTrend",
},
]
- },
+ },
+ {"name" : "bucketList",
+ "ingredients" : [
+ {
+ "name" : "bucketList",
+ "description" : "Bucket list",
+ "type" : "pythonSQL",
+ "code" : "BucketList",
+ },
+ ],
+ "nodewise" : True,
+ },
]

0 comments on commit 10a0fca

Please sign in to comment.
Something went wrong with that request. Please try again.