Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

first cut of design

  • Loading branch information...
commit c3756a8ce07bb4ac13710fba4cca4701e4f0fea6 1 parent 6872a6b
@bcui6611 bcui6611 authored
View
25 analyzer.py
@@ -0,0 +1,25 @@
+import cluster_stats
+import diskqueue_stats
+import dbaccessor
+
+class StatsAnalyzer:
+ def __init__(self):
+ self.accessor = dbaccessor.DbAccesor()
+
+ def run_analysis(self):
+ self.accessor.connect_db()
+ self.accessor.browse_db()
+
+ #print cluster_stats.ClusterCapsule
+ for pill in cluster_stats.ClusterCapsule:
+ #print pill['name']
+ for counter in pill['ingredients']:
+ if counter['type'] == 'SQL':
+ result = self.accessor.execute(counter['code'])
+ print counter["description"], ":", result[0]
+ elif counter['type'] == 'python':
+ result = eval("cluster_stats.{0}().run(self.accessor)".format(counter['code']))
+ print counter["description"], ": ", result
+
+ self.accessor.close()
+ self.accessor.remove_db()
View
115 buckets.py
@@ -0,0 +1,115 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from usage import usage
+
+import restclient
+
+rest_cmds = {
+ 'bucket-list': '/pools/default/buckets',
+ 'bucket-stats': '/pools/default/buckets/{0}/stats?zoom=hour',
+ 'bucket-node-stats': '/pools/default/buckets/{0}/stats/{1}?zoom=day'
+ }
+methods = {
+ 'bucket-list': 'GET',
+ 'bucket-stats': 'GET',
+ 'bucket-node-stats': 'GET',
+ }
+
+stats = [
+ 'cmd_get',
+ 'cmd_set',
+ 'delete_hits',
+ 'ep_tap_total_total_backlog_size',
+ 'curr_items',
+ 'curr_items_tot',
+ 'disk_write_queue',
+ 'curr_connections',
+ ]
+
+class Buckets:
+ def __init__(self):
+ self.debug = False
+ self.rest_cmd = rest_cmds['bucket-list']
+ self.method = 'GET'
+
+ def runCmd(self, cmd, server, port,
+ user, password, opts):
+ self.user = user
+ self.password = password
+
+ bucketname = ''
+ buckettype = ''
+ authtype = 'sasl'
+ bucketport = '11211'
+ bucketpassword = ''
+ bucketramsize = ''
+ bucketreplication = '1'
+ output = 'default'
+
+ for (o, a) in opts:
+ if o == '-b' or o == '--bucket':
+ bucketname = a
+ if o == '--bucket-type':
+ buckettype = a
+ if o == '--bucket-port':
+ bucketport = a
+ if o == '--bucket-password':
+ bucketpassword = a
+ if o == '--bucket-ramsize':
+ bucketramsize = a
+ if o == '--bucket-replica':
+ bucketreplication = a
+ if o == '-d' or o == '--debug':
+ self.debug = True
+ if o in ('-o', '--output'):
+ output = a
+ self.rest_cmd = rest_cmds[cmd]
+ rest = restclient.RestClient(server, port, {'debug':self.debug})
+
+ # get the parameters straight
+
+ opts = {}
+ opts['error_msg'] = "unable to %s" % cmd
+ opts['success_msg'] = "%s" % cmd
+ data = rest.restCmd(methods[cmd], self.rest_cmd,
+ self.user, self.password, opts)
+
+ return rest.getJson(data)
+
+class BucketStats:
+ def __init__(self, bucket_name):
+ self.debug = False
+ self.rest_cmd = rest_cmds['bucket-stats'].format(bucket_name)
+ self.method = 'GET'
+
+ def runCmd(self, cmd, server, port,
+ user, password, opts):
+ opts = {}
+ opts['error_msg'] = "unable to %s" % cmd
+ opts['success_msg'] = "%s" % cmd
+
+ #print server, port, cmd, self.rest_cmd
+ rest = restclient.RestClient(server, port, {'debug':self.debug})
+ data = rest.restCmd(methods[cmd], self.rest_cmd,
+ user, password, opts)
+ return rest.getJson(data)
+
+class BucketNodeStats:
+ def __init__(self, bucket_name, stat_name):
+ self.debug = False
+ self.rest_cmd = rest_cmds['bucket-node-stats'].format(bucket_name, stat_name)
+ self.method = 'GET'
+
+ def runCmd(self, cmd, server, port,
+ user, password, opts):
+ opts = {}
+ opts['error_msg'] = "unable to %s" % cmd
+ opts['success_msg'] = "%s" % cmd
+
+ #print server, port, cmd, self.rest_cmd
+ rest = restclient.RestClient(server, port, {'debug':self.debug})
+ data = rest.restCmd(methods[cmd], self.rest_cmd,
+ user, password, opts)
+ return rest.getJson(data)
+
View
19 cluster.py
@@ -0,0 +1,19 @@
+
+ClusterCapsule = {
+ "pill" : { "name" : "Node Status",
+ "ingredients" : {
+ "counter" : {
+ "Descripition" : "Number of Nodes",
+ "Code" : "SELECT count(*) FROM ServerNode"
+ },
+ "counter" : {
+ "Descripition" : "Number of Down Nodes",
+ "Code" : "SELECT count(*) FROM ServerNode WHERE status='down'"
+ },
+ "counter" : {
+ "Description" : "Number of Warmup Nodes",
+ "Code" : "SELECT count(*) FROM ServerNode WHERE status='warmup'"
+ }
+ }
+ }
+}
View
91 cluster_stats.py
@@ -0,0 +1,91 @@
+import dbaccessor
+
+class DGMRatio:
+ def run(self, accessor):
+ hdd = accessor.execute("SELECT sum(usedbyData) FROM StorageInfo WHERE type='hdd'")
+ ram = accessor.execute("SELECT sum(usedbyData) FROM StorageInfo WHERE type='ram'")
+ if ram[0] > 0:
+ ratio = hdd[0] / ram[0]
+ else:
+ ratio = 0
+ return ratio
+
+class ARRatio:
+ def run(self, accessor):
+ active = accessor.execute("SELECT sum(currentItems) FROM SystemStats")
+ replica = accessor.execute("SELECT sum(replicaCurrentItems) FROM SystemStats")
+ if replica[0] > 0:
+ ratio = active[0] / replica[0]
+ else:
+ ratio = 0
+ return ratio
+
+class OpsRatio:
+ def run(self, accessor):
+ ops = accessor.execute("SELECT sum(getOps), sum(setOps), sum(delOps) FROM BucketOps")
+ total = accessor.execute("SELECT count(*) FROM BucketOps")
+ get_avg = ops[0] / total[0]
+ set_avg = ops[1] / total[0]
+ del_avg = ops[2] / total[0]
+ total_avg = get_avg + set_avg + del_avg
+ return (get_avg / total_avg * 100, set_avg / total_avg * 100, del_avg / total_avg * 100)
+
+ClusterCapsule = [
+ {"name" : "Node Status",
+ "ingredients" : [
+ {
+ "description" : "Number of Nodes",
+ "type" : "SQL",
+ "code" : "SELECT count(*) FROM ServerNode"
+ },
+ {
+ "description" : "Number of Down Nodes",
+ "type" : "SQL",
+ "code" : "SELECT count(*) FROM ServerNode WHERE status='down'"
+ },
+ {
+ "description" : "Number of Warmup Nodes",
+ "type" : "SQL",
+ "code" : "SELECT count(*) FROM ServerNode WHERE status='warmup'"
+ }
+ ]
+ },
+ {"name" : "Total Data Size",
+ "ingredients" : [
+ {
+ "description" : "Total Data Size",
+ "type" : "SQL",
+ "code" : "SELECT sum(usedbyData) FROM StorageInfo WHERE type='hdd'"
+ }
+ ]
+ },
+ {"name" : "DGM",
+ "ingredients" : [
+ {
+ "description" : "DGM Ratio",
+ "type" : "python",
+ "code" : "DGMRatio"
+ },
+ ]
+ },
+ {"name" : "Active / Replica Resident Ratio",
+ "ingredients" : [
+ {
+ "description" : " A/R Ratio",
+ "type" : "python",
+ "code" : "ARRatio"
+ },
+ ]
+ },
+ {"name" : "OPS performance",
+ "ingredients" : [
+ {
+ "description" : "Read/Write/Delete ops ratio",
+ "type" : "python",
+ "code" : "OpsRatio"
+ },
+ ]
+ }
+]
+
+
View
284 dbaccessor.py
@@ -0,0 +1,284 @@
+try:
+ import sqlite3
+except:
+ print "healthchecker requires python version 2.6 or greater"
+ sys.exit(1)
+import os
+
+class DbAccesor:
+ def __init__(self):
+ self.conn = None
+ self.cursor = None
+ self.db_file = "perfmon.db"
+
+ def connect_db(self):
+ self.conn = sqlite3.connect(self.db_file)
+ self.cursor = self.conn.cursor()
+
+ def close(self):
+ self.conn.commit()
+ self.cursor.close()
+ self.conn.close()
+ self.conn = None
+ self.cursor = None
+
+ def remove_db(self):
+ os.remove(self.db_file)
+
+ def create_databases(self):
+ self.cursor.execute(""" CREATE TABLE IF NOT EXISTS ServerNode (
+ serverId INTEGER PRIMARY KEY,
+ host TEXT NOT NULL,
+ port INTEGER NOT NULL,
+ status TEXT,
+ portDirect INTEGER,
+ portProxy INTEGER,
+ clusterMembership INTEGER,
+ os TEXT,
+ uptime INTEGER,
+ version TEXT)""")
+
+ self.cursor.execute(""" CREATE TABLE IF NOT EXISTS DiskInfo (
+ diskInfoId INTEGER PRIMARY KEY,
+ path TEXT NOT NULL,
+ sizeBytes INTEGER,
+ usagePercent INTEGER,
+ serverId INTEGER,
+ FOREIGN KEY(serverId) REFERENCES ServerNode(serverId))""")
+
+ self.cursor.execute(""" CREATE TABLE IF NOT EXISTS MemoryInfo (
+ memoryInfoId INTEGER PRIMARY KEY,
+ allocated INTEGER,
+ reserved INTEGER,
+ free INTEGER,
+ quota INTEGER,
+ total INTEGER,
+ serverId INTEGER,
+ FOREIGN KEY(serverId) REFERENCES ServerNode(serverId))""")
+
+ self.cursor.execute(""" CREATE TABLE IF NOT EXISTS StorageInfo (
+ storageInfoId INTEGER PRIMARY KEY,
+ type TEXT,
+ free REAL,
+ quotaTotal REAL,
+ total REAL,
+ used REAL,
+ usedbyData REAL,
+ serverId INTEGER,
+ FOREIGN KEY(serverId) REFERENCES ServerNode(serverId))""")
+
+ self.cursor.execute(""" CREATE TABLE IF NOT EXISTS SystemStats (
+ id INTEGER PRIMARY KEY,
+ cpuUtilization REAL,
+ swapTotal REAL,
+ swapUsed REAL,
+ currentItems INTEGER,
+ currentItemsTotal INTEGER,
+ replicaCurrentItems INTEGER,
+ serverId INTEGER,
+ FOREIGN KEY(serverId) REFERENCES ServerNode(serverId))""")
+
+ self.cursor.execute(""" CREATE TABLE IF NOT EXISTS Bucket (
+ bucketId INTEGER PRIMARY KEY,
+ name TEXT NOT NULL,
+ type TEXT NOT NULL,
+ authType TEXT,
+ saslPassword TEXT,
+ numReplica INTEGER,
+ ramQuota REAL)""")
+
+ self.cursor.execute(""" CREATE TABLE IF NOT EXISTS BucketStats (
+ id INTEGER PRIMARY KEY,
+ diskUsed REAL,
+ memUsed REAL,
+ diskFetch INTEGER,
+ quotaPercentUsed REAL,
+ opsPerSec INTEGER,
+ itemCount INTEGER,
+ bucketId INTEGER,
+ timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ FOREIGN KEY(bucketId) REFERENCES Bucket(bucketId))""")
+
+ self.cursor.execute(""" CREATE TABLE IF NOT EXISTS BucketOps (
+ id INTEGER PRIMARY KEY,
+ getOps REAL,
+ setOps REAL,
+ delOps REAL,
+ diskWriteQueue REAL,
+ bucketId INTEGER,
+ timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ FOREIGN KEY(bucketId) REFERENCES Bucket(bucketId))""")
+
+ def create_or_update_node(self, host, port, status):
+ sqlstmt = """INSERT OR REPLACE INTO ServerNode (host,port,status)
+ VALUES( '{0}', {1}, '{2}' )"""
+ self.cursor.execute(sqlstmt.format(host, port, status))
+ return self.cursor.lastrowid
+
+ def process_node_stats(self, nodeId, nodeInfo):
+ sqlstmt = """ UPDATE ServerNode
+ SET portDirect={0}, portProxy={1}, clusterMembership='{2}',
+ os='{3}', uptime={4}, version='{5}'
+ WHERE serverId = {6}"""
+ self.cursor.execute(sqlstmt.format(nodeInfo['ports']['direct'],
+ nodeInfo['ports']['proxy'],
+ nodeInfo['clusterMembership'],
+ nodeInfo['os'],
+ nodeInfo['uptime'],
+ nodeInfo['version'],
+ nodeId));
+
+ #memory
+ sqlstmt = """ INSERT OR REPLACE INTO MemoryInfo
+ (allocated, reserved, free, quota, total, serverId)
+ VALUES({0}, {1}, {2}, {3}, {4}, {5})"""
+
+ self.cursor.execute(sqlstmt.format(nodeInfo['mcdMemoryAllocated'],
+ nodeInfo['mcdMemoryReserved'],
+ nodeInfo['memoryFree'],
+ nodeInfo['memoryQuota'],
+ nodeInfo['memoryTotal'],
+ nodeId));
+
+ #storageInfo
+ sqlstmt = """ INSERT OR REPLACE INTO StorageInfo
+ (type, free, quotaTotal, total, used, usedbyData, serverId)
+ VALUES('{0}', {1}, {2}, {3}, {4}, {5}, {6})"""
+
+ if nodeInfo['storageTotals'] is not None:
+ hdd = nodeInfo['storageTotals']['hdd']
+ if hdd is not None:
+ self.cursor.execute(sqlstmt.format('hdd',
+ hdd['free'],
+ hdd['quotaTotal'],
+ hdd['total'],
+ hdd['used'],
+ hdd['usedByData'],
+ nodeId));
+ ram = nodeInfo['storageTotals']['ram']
+ if hdd is not None:
+ self.cursor.execute(sqlstmt.format('ram',
+ hdd['free'],
+ hdd['quotaTotal'],
+ hdd['total'],
+ hdd['used'],
+ hdd['usedByData'],
+ nodeId));
+
+ #system stats
+ sqlstmt = """ INSERT OR REPLACE INTO SystemSTats
+ (cpuUtilization, swapTotal, swapUsed, currentItems, currentItemsTotal, replicaCurrentItems, serverId)
+ VALUES({0}, {1}, {2}, {3}, {4}, {5}, {6})"""
+ if nodeInfo['interestingStats'] is not None:
+ if nodeInfo['interestingStats'].has_key('curr_items'):
+ curr_items = nodeInfo['interestingStats']['curr_items']
+ else:
+ curr_items = 0
+ if nodeInfo['interestingStats'].has_key('curr_items_tot'):
+ curr_items_tot = nodeInfo['interestingStats']['curr_items_tot']
+ else:
+ curr_items_tot = 0
+ if nodeInfo['interestingStats'].has_key('vb_replica_curr_items'):
+ vb_rep_curr_items = nodeInfo['interestingStats']['vb_replica_curr_items']
+ else:
+ vb_rep_curr_items = 0
+ else:
+ curr_items = 0
+ curr_items_tot = 0
+ vb_rep_curr_items = 0
+ self.cursor.execute(sqlstmt.format(nodeInfo['systemStats']['cpu_utilization_rate'],
+ nodeInfo['systemStats']['swap_total'],
+ nodeInfo['systemStats']['swap_used'],
+ curr_items,
+ curr_items_tot,
+ vb_rep_curr_items,
+ nodeId));
+
+ return True
+
+ def process_bucket(self, bucket):
+ sqlstmt = """INSERT OR REPLACE INTO Bucket
+ (name, type, authType, saslPassword, numReplica, ramQuota)
+ VALUES('{0}', '{1}', '{2}', '{3}', {4}, {5})"""
+ self.cursor.execute(sqlstmt.format(bucket['name'],
+ bucket['bucketType'],
+ bucket['authType'],
+ bucket['saslPassword'],
+ bucket['replicaNumber'],
+ bucket['quota']['ram']))
+ bucketId = self.cursor.lastrowid
+
+ sqlstmt = """INSERT INTO BucketStats
+ (diskUsed, memUsed, diskFetch, quotaPercentUsed, opsPerSec, itemCount, bucketId)
+ VALUES({0}, {1}, {2}, {3}, {4}, {5}, {6})"""
+ bucketStats = bucket['basicStats']
+ self.cursor.execute(sqlstmt.format(bucketStats['diskUsed'],
+ bucketStats['memUsed'],
+ bucketStats['diskFetches'],
+ bucketStats['quotaPercentUsed'],
+ bucketStats['opsPerSec'],
+ bucketStats['itemCount'],
+ bucketId))
+ return (bucket['name'], bucketId)
+
+ def process_bucket_stats(self, bucket_id, json):
+ sqlstmt = """INSERT OR REPLACE INTO BucketOps
+ (getOps, setOps, delOps, diskWriteQueue, bucketId)
+ VALUES({0}, {1}, {2}, {3}, {4})"""
+ #print "op", json["op"]
+ #print "op/sample", json["op"]["samples"]
+ #print "op/sample/cmd_get", json["op"]["samples"]["cmd_get"]
+ samples = json["op"]["samples"]
+ #for sample in samples.keys():
+ # print sample
+ total_samples = json["op"]["samplesCount"]
+ get_avg = sum(json["op"]["samples"]["cmd_get"]) / total_samples
+ set_avg = sum(json["op"]["samples"]["cmd_set"]) / total_samples
+ del_avg = sum(json["op"]["samples"]["delete_hits"]) / total_samples
+ disk_write_queue_avg = sum(json["op"]["samples"]["disk_write_queue"]) / total_samples
+ #print get_avg, set_avg, del_avg, disk_write_queue_avg
+ self.cursor.execute(sqlstmt.format(get_avg, set_avg, del_avg, disk_write_queue_avg, bucket_id))
+
+ def process_bucket_node_stats(self, bucket_id, node_name, stat, jason):
+ sqlstmt = """INSERT OR REPLACE INTO BucketOps
+ (getOps, setOps, delOps, diskWriteQueue, bucketId)
+ VALUES({0}, {1}, {2}, {3}, {4})"""
+ #print "op", json["op"]
+ #print "op/sample", json["op"]["samples"]
+ #print "op/sample/cmd_get", json["op"]["samples"]["cmd_get"]
+ #samples = json["op"]["samples"]
+ #for sample in samples.keys():
+ # print sample
+ #total_samples = json["op"]["samplesCount"]
+
+ #del_avg = sum(json["op"]["samples"]["delete_hits"]) / total_samples
+ #disk_write_queue_avg = sum(json["op"]["samples"]["disk_write_queue"]) / total_samples
+ #print get_avg, set_avg, del_avg, disk_write_queue_avg
+ #self.cursor.execute(sqlstmt.format(get_avg, set_avg, del_avg, disk_write_queue_avg, bucket_id))
+
+ def extract_result(self, rows):
+ if rows is not None:
+ for row in rows:
+ return row
+ else:
+ return [0]
+
+ def execute(self, stmt):
+ self.cursor.execute(stmt)
+ return self.extract_result(self.cursor.fetchall())
+
+ def browse_table(self, table):
+ stmt = "SELECT * from {0}"
+ self.cursor.execute(stmt.format(table))
+ rows = self.cursor.fetchall()
+ for row in rows:
+ print row
+
+ def browse_db(self):
+ self.browse_table("ServerNode")
+ self.browse_table("MemoryInfo")
+ self.browse_table("StorageInfo")
+ self.browse_table("SystemStats")
+ self.browse_table("Bucket")
+ self.browse_table("BucketStats")
+ self.browse_table("BucketOps")
View
39 diskqueue_stats.py
@@ -0,0 +1,39 @@
+import dbaccessor
+
+class DiskQueue:
+ def run(self, accessor):
+ queue_size = accessor.execute("SELECT sum(diskWriteQueue) FROM BucketOps")
+ total = accessor.execute("SELECT count(*) FROM BucketOps")
+ disk_queue_avg = queue_size[0] / total[0]
+ return (disk_queue_avg)
+
+class TotalOps:
+ def run(self, accessor):
+ ops = accessor.execute("SELECT sum(getOps), sum(setOps), sum(delOps) FROM BucketOps")
+ total = accessor.execute("SELECT count(*) FROM BucketOps")
+ get_avg = ops[0] / total[0]
+ set_avg = ops[1] / total[0]
+ del_avg = ops[2] / total[0]
+ total_avg = get_avg + set_avg + del_avg
+ return (total_avg, get_avg / total_avg * 100, set_avg / total_avg * 100, del_avg / total_avg * 100)
+
+DiskQueueCapsule = [
+ {"name" : "Disk Queue Diagnosis",
+ "ingredients" : [
+ {
+ "description" : "Avg Disk queue length",
+ "type" : "python",
+ "code" : "DiskQueue",
+ "threshold" : {
+ "low" : 50000000,
+ "high" : 1000000000
+ }
+ },
+ {
+ "description" : "Total OPS",
+ "type" : "python",
+ "code" : "TotalOps"
+ }
+ ]
+ }
+]
View
160 healthChecker.py
@@ -0,0 +1,160 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import getopt
+import sys
+import os
+
+import dbaccessor
+import analyzer
+
+import listservers
+import buckets
+import node
+import info
+import util_cli as util
+import mc_bin_client
+import simplejson
+
+def parse_opt():
+ (cluster, user, password) = ('', '','')
+
+ try:
+ (opts, _args) = getopt.getopt(sys.argv[1:],
+ 'c:dp:u:', [
+ 'cluster=',
+ 'debug',
+ 'password=',
+ 'user='
+ ])
+ except getopt.GetoptError, err:
+ usage(err)
+
+ for (opt, arg) in opts:
+ if opt in ('-c', '--cluster'):
+ cluster = arg
+ if opt in ('-u', '--user'):
+ user = arg
+ if opt in ('-p', '--password'):
+ password = arg
+ if opt in ('-d', '--debug'):
+ debug = True
+ if not cluster:
+ usage("please provide a CLUSTER, or use -h for more help.")
+ return (cluster, user, password, opts)
+
+def get_stats(mc, stats):
+ try:
+ node_stats = mc.stats('')
+ if node_stats:
+ for key, val in node_stats.items():
+ stats[key] = val
+ except Exception, err:
+ print "ERROR: command: %s: %s:%d, %s" % ('stats all', server, port, err)
+ sys.exit(1)
+
+ try:
+ node_stats = mc.stats('tap')
+ if node_stats:
+ for key, val in node_stats.items():
+ stats[key] = val
+ except Exception, err:
+ print "ERROR: command: %s: %s:%d, %s" % ('stats tap', server, port, err)
+ sys.exit(1)
+
+def stats_formatter(stats, prefix=" ", cmp=None):
+ if stats:
+ longest = max((len(x) + 2) for x in stats.keys())
+ for stat, val in sorted(stats.items(), cmp=cmp):
+ s = stat + ":"
+ print "%s%s%s" % (prefix, s.ljust(longest), val)
+
+def collect_data():
+
+ (cluster, user, password, opts) = parse_opt()
+ server, port = util.hostport(cluster)
+
+ nodes = []
+ commands = {
+ 'host-list' : listservers.ListServers,
+ 'server-info' : info.Info,
+ 'bucket-list' : buckets.Buckets,
+ 'bucket-stats' : buckets.BucketStats,
+ 'bucket-node-stats' : buckets.BucketNodeStats,
+ }
+
+ accessor = dbaccessor.DbAccesor()
+
+ accessor.connect_db()
+ accessor.create_databases();
+
+ #get node list and its status
+ try:
+ cmd = 'host-list'
+ c = commands[cmd]()
+ nodes = c.runCmd(cmd, server, port, user, password, opts)
+ except Exception, err:
+ print "ERROR: command: %s: %s:%d, %s" % (cmd, server, port, err)
+ sys.exit(1)
+
+ #get each node information
+ try:
+ cmd = 'server-info'
+ c = commands[cmd]()
+ for node in nodes:
+ (node_server, node_port) = util.hostport(node['hostname'])
+ nodeid = accessor.create_or_update_node(node_server, node_port, node['status'])
+
+ if node['status'] == 'healthy':
+ node_info = c.runCmd(cmd, node_server, node_port, user, password, opts)
+ accessor.process_node_stats(nodeid, node_info)
+ stats = {}
+ mc = mc_bin_client.MemcachedClient(node_server, node['ports']['direct'])
+ get_stats(mc, stats)
+ else:
+ print "Unhealthy node: %s:%s" %(node_server, node['status'])
+ except Exception, err:
+ print "ERROR: command: %s: %s:%d, %s" % (cmd, server, port, err)
+ sys.exit(1)
+
+ #get each bucket information
+ try:
+ cmd = 'bucket-list'
+ c = commands[cmd]()
+ json = c.runCmd(cmd, server, port, user, password, opts)
+ for bucket in json:
+ (bucket_name, bucket_id) = accessor.process_bucket(bucket)
+
+ # get bucket related stats
+ #cmd = 'bucket-stats'
+ #c = buckets.BucketStats(bucket_name)
+ #json = c.runCmd(cmd, server, port, user, password, opts)
+ #accessor.process_bucket_stats(bucket_id, json)
+
+ #retrieve bucket stats per node
+ cmd = 'bucket-node-stats'
+ for stat in buckets.stats:
+ c = buckets.BucketNodeStats(bucket_name, stat)
+ print stat, server
+ json = c.runCmd(cmd, server, port, user, password, opts)
+ print simplejson.dumps(json, sort_keys=False, indent=2)
+ accessor.process_bucket_node_stats(bucket_id, server, stat, json)
+ #print simplejson.dumps(json, sort_keys=False, indent=2)
+ except Exception, err:
+ print "ERROR: command: %s: %s:%d, %s" % (cmd, server, port, err)
+ sys.exit(1)
+
+ accessor.close()
+
+
+def main():
+
+ #make snapshot for the current cluster status
+ collect_data()
+
+ #analyze the snapshot and historic data
+ performer = analyzer.StatsAnalyzer()
+ performer.run_analysis()
+
+if __name__ == '__main__':
+ main()
View
36 info.py
@@ -0,0 +1,36 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+ Provides info about a particular server.
+"""
+
+from usage import usage
+
+import restclient
+import simplejson
+import subprocess
+import sys
+
+class Info:
+ def __init__(self):
+ self.debug = False
+
+ def runCmd(self, cmd, server, port,
+ user, password, opts):
+ for (o, a) in opts:
+ if o == '-d' or o == '--debug':
+ self.debug = True
+ rest = restclient.RestClient(server, port, {'debug':self.debug})
+ opts = {'error_msg': 'server-info error'}
+
+ data = rest.restCmd('GET', '/nodes/self',
+ user, password, opts)
+
+ json = rest.getJson(data)
+
+ for x in ['license', 'licenseValid', 'licenseValidUntil']:
+ if x in json:
+ del(json[x])
+ print simplejson.dumps(json, sort_keys=True, indent=2)
+ return json
View
81 listservers.py
@@ -0,0 +1,81 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import sys
+import restclient
+
+from usage import usage
+
+class ListServers:
+ def __init__(self):
+ self.rest_cmd = '/pools/default'
+ self.method = 'GET'
+ self.output = 'standard'
+ self.debug = False
+ self.user = ''
+ self.password = ''
+ self.error = ''
+
+ def runCmd(self, cmd, server, port,
+ user, password, opts,):
+ self.cmd = cmd
+ self.server = server
+ self.port = port
+ self.user = user
+ self.password = password
+ for (o, a) in opts:
+ if o in ('-o', '--output'):
+ self.output = a
+ if o in ('-d', '--debug'):
+ self.debug = True
+
+ data = self.getData(self.server,
+ self.port,
+ self.user,
+ self.password)
+ if (self.output == 'json'):
+ print data
+ else:
+ # obtain dict of nodes. If not dict, is error message
+ nodes = self.getNodes(data)
+ if type(nodes) == type(list()):
+ self.printNodes(nodes)
+ else:
+ print self.error
+ return nodes
+
+ def getData(self, server, port, user, password):
+ """
+ getData()
+
+ Obtain the raw json output from the server
+ The reason for passing arguments which could be obtained
+ from 'self' is because getData() must be callable externally
+ """
+ self.rest = restclient.RestClient(server, port,
+ {'debug':self.debug})
+ return self.rest.restCmd('GET', self.rest_cmd,
+ user, password)
+
+ def getNodes(self, data):
+ """
+ Deserialize json into nodes.
+ """
+ json = self.rest.getJson(data)
+ if type(json) == type(unicode()):
+ self.error = json
+ return None
+ elif type(json) == type(list()):
+ self.error = json[0]
+ return None
+ return json['nodes']
+
+ def printNodes(self, nodes):
+ for node in nodes:
+ if self.cmd == "host-list":
+ print node['hostname']
+ else:
+ print '%s %s %s %s' % (node['otpNode'],
+ node['hostname'],
+ node['status'],
+ node['clusterMembership'])
View
442 mc_bin_client.py
@@ -0,0 +1,442 @@
+#!/usr/bin/env python
+"""
+Binary memcached test client.
+
+Copyright (c) 2007 Dustin Sallings <dustin@spy.net>
+"""
+
+import sys
+import time
+import hmac
+import socket
+import random
+import struct
+import exceptions
+
+from memcacheConstants import REQ_MAGIC_BYTE, RES_MAGIC_BYTE
+from memcacheConstants import REQ_PKT_FMT, RES_PKT_FMT, MIN_RECV_PACKET
+from memcacheConstants import SET_PKT_FMT, DEL_PKT_FMT, INCRDECR_RES_FMT
+from memcacheConstants import TOUCH_PKT_FMT, GAT_PKT_FMT, GETL_PKT_FMT
+import memcacheConstants
+
+class MemcachedError(exceptions.Exception):
+ """Error raised when a command fails."""
+
+ def __init__(self, status, msg):
+ supermsg='Memcached error #' + `status`
+ if msg: supermsg += ": " + msg
+ exceptions.Exception.__init__(self, supermsg)
+
+ self.status=status
+ self.msg=msg
+
+ def __repr__(self):
+ return "<MemcachedError #%d ``%s''>" % (self.status, self.msg)
+
+class MemcachedClient(object):
+ """Simple memcached client."""
+
+ vbucketId = 0
+
+ def __init__(self, host='127.0.0.1', port=11211):
+ self.host = host
+ self.port = port
+ self.s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.s.connect_ex((host, port))
+ self.r=random.Random()
+
+ def close(self):
+ self.s.close()
+
+ def __del__(self):
+ self.close()
+
+ def _sendCmd(self, cmd, key, val, opaque, extraHeader='', cas=0):
+ self._sendMsg(cmd, key, val, opaque, extraHeader=extraHeader, cas=cas,
+ vbucketId=self.vbucketId)
+
+ def _sendMsg(self, cmd, key, val, opaque, extraHeader='', cas=0,
+ dtype=0, vbucketId=0,
+ fmt=REQ_PKT_FMT, magic=REQ_MAGIC_BYTE):
+ msg=struct.pack(fmt, magic,
+ cmd, len(key), len(extraHeader), dtype, vbucketId,
+ len(key) + len(extraHeader) + len(val), opaque, cas)
+ self.s.send(msg + extraHeader + key + val)
+
+ def _recvMsg(self):
+ response = ""
+ while len(response) < MIN_RECV_PACKET:
+ data = self.s.recv(MIN_RECV_PACKET - len(response))
+ if data == '':
+ raise exceptions.EOFError("Got empty data (remote died?).")
+ response += data
+ assert len(response) == MIN_RECV_PACKET
+ magic, cmd, keylen, extralen, dtype, errcode, remaining, opaque, cas=\
+ struct.unpack(RES_PKT_FMT, response)
+
+ rv = ""
+ while remaining > 0:
+ data = self.s.recv(remaining)
+ if data == '':
+ raise exceptions.EOFError("Got empty data (remote died?).")
+ rv += data
+ remaining -= len(data)
+
+ assert (magic in (RES_MAGIC_BYTE, REQ_MAGIC_BYTE)), "Got magic: %d" % magic
+ return cmd, errcode, opaque, cas, keylen, extralen, rv
+
+ def _handleKeyedResponse(self, myopaque):
+ cmd, errcode, opaque, cas, keylen, extralen, rv = self._recvMsg()
+ assert myopaque is None or opaque == myopaque, \
+ "expected opaque %x, got %x" % (myopaque, opaque)
+ if errcode != 0:
+ raise MemcachedError(errcode, rv)
+ return cmd, opaque, cas, keylen, extralen, rv
+
+ def _handleSingleResponse(self, myopaque):
+ cmd, opaque, cas, keylen, extralen, data = self._handleKeyedResponse(myopaque)
+ return opaque, cas, data
+
+ def _doCmd(self, cmd, key, val, extraHeader='', cas=0):
+ """Send a command and await its response."""
+ opaque=self.r.randint(0, 2**32)
+ self._sendCmd(cmd, key, val, opaque, extraHeader, cas)
+ return self._handleSingleResponse(opaque)
+
+ def _mutate(self, cmd, key, exp, flags, cas, val):
+ return self._doCmd(cmd, key, val, struct.pack(SET_PKT_FMT, flags, exp),
+ cas)
+
+ def _cat(self, cmd, key, cas, val):
+ return self._doCmd(cmd, key, val, '', cas)
+
+ def append(self, key, value, cas=0):
+ return self._cat(memcacheConstants.CMD_APPEND, key, cas, value)
+
+ def prepend(self, key, value, cas=0):
+ return self._cat(memcacheConstants.CMD_PREPEND, key, cas, value)
+
+ def __incrdecr(self, cmd, key, amt, init, exp):
+ something, cas, val=self._doCmd(cmd, key, '',
+ struct.pack(memcacheConstants.INCRDECR_PKT_FMT, amt, init, exp))
+ return struct.unpack(INCRDECR_RES_FMT, val)[0], cas
+
+ def incr(self, key, amt=1, init=0, exp=0):
+ """Increment or create the named counter."""
+ return self.__incrdecr(memcacheConstants.CMD_INCR, key, amt, init, exp)
+
+ def decr(self, key, amt=1, init=0, exp=0):
+ """Decrement or create the named counter."""
+ return self.__incrdecr(memcacheConstants.CMD_DECR, key, amt, init, exp)
+
+ def set(self, key, exp, flags, val):
+ """Set a value in the memcached server."""
+ return self._mutate(memcacheConstants.CMD_SET, key, exp, flags, 0, val)
+
+ def add(self, key, exp, flags, val):
+ """Add a value in the memcached server iff it doesn't already exist."""
+ return self._mutate(memcacheConstants.CMD_ADD, key, exp, flags, 0, val)
+
+ def replace(self, key, exp, flags, val):
+ """Replace a value in the memcached server iff it already exists."""
+ return self._mutate(memcacheConstants.CMD_REPLACE, key, exp, flags, 0,
+ val)
+
+ def __parseGet(self, data, klen=0):
+ flags=struct.unpack(memcacheConstants.GET_RES_FMT, data[-1][:4])[0]
+ return flags, data[1], data[-1][4 + klen:]
+
+ def get(self, key):
+ """Get the value for a given key within the memcached server."""
+ parts=self._doCmd(memcacheConstants.CMD_GET, key, '')
+ return self.__parseGet(parts)
+
+ def getl(self, key, exp=15):
+ """Get the value for a given key within the memcached server."""
+ parts=self._doCmd(memcacheConstants.CMD_GET_LOCKED, key, '',
+ struct.pack(memcacheConstants.GETL_PKT_FMT, exp))
+ return self.__parseGet(parts)
+
+ def cas(self, key, exp, flags, oldVal, val):
+ """CAS in a new value for the given key and comparison value."""
+ self._mutate(memcacheConstants.CMD_SET, key, exp, flags,
+ oldVal, val)
+
+ def touch(self, key, exp):
+ """Touch a key in the memcached server."""
+ return self._doCmd(memcacheConstants.CMD_TOUCH, key, '',
+ struct.pack(memcacheConstants.TOUCH_PKT_FMT, exp))
+
+ def gat(self, key, exp):
+ """Get the value for a given key and touch it within the memcached server."""
+ parts=self._doCmd(memcacheConstants.CMD_GAT, key, '',
+ struct.pack(memcacheConstants.GAT_PKT_FMT, exp))
+ return self.__parseGet(parts)
+
+ def version(self):
+ """Get the value for a given key within the memcached server."""
+ return self._doCmd(memcacheConstants.CMD_VERSION, '', '')
+
+ def sasl_mechanisms(self):
+ """Get the supported SASL methods."""
+ return set(self._doCmd(memcacheConstants.CMD_SASL_LIST_MECHS,
+ '', '')[2].split(' '))
+
+ def sasl_auth_start(self, mech, data):
+ """Start a sasl auth session."""
+ return self._doCmd(memcacheConstants.CMD_SASL_AUTH, mech, data)
+
+ def sasl_auth_plain(self, user, password, foruser=''):
+ """Perform plain auth."""
+ return self.sasl_auth_start('PLAIN', '\0'.join([foruser, user, password]))
+
+ def sasl_auth_cram_md5(self, user, password):
+ """Start a plan auth session."""
+ try:
+ self.sasl_auth_start('CRAM-MD5', '')
+ except MemcachedError, e:
+ if e.status != memcacheConstants.ERR_AUTH_CONTINUE:
+ raise
+ challenge = e.msg
+
+ dig = hmac.HMAC(password, challenge).hexdigest()
+ return self._doCmd(memcacheConstants.CMD_SASL_STEP, 'CRAM-MD5',
+ user + ' ' + dig)
+
+ def stop_persistence(self):
+ return self._doCmd(memcacheConstants.CMD_STOP_PERSISTENCE, '', '')
+
+ def start_persistence(self):
+ return self._doCmd(memcacheConstants.CMD_START_PERSISTENCE, '', '')
+
+ def set_flush_param(self, key, val):
+ print "setting flush param:", key, val
+ return self._doCmd(memcacheConstants.CMD_SET_FLUSH_PARAM, key, val)
+
+ def stop_replication(self):
+ return self._doCmd(memcacheConstants.CMD_STOP_REPLICATION, '', '')
+
+ def start_replication(self):
+ return self._doCmd(memcacheConstants.CMD_START_REPLICATION, '', '')
+
+ def start_onlineupdate(self):
+ return self._doCmd(memcacheConstants.CMD_START_ONLINEUPDATE, '', '')
+
+ def complete_onlineupdate(self):
+ return self._doCmd(memcacheConstants.CMD_COMPLETE_ONLINEUPDATE, '', '')
+
+ def revert_onlineupdate(self):
+ return self._doCmd(memcacheConstants.CMD_REVERT_ONLINEUPDATE, '', '')
+
+ def set_tap_param(self, key, val):
+ print "setting tap param:", key, val
+ return self._doCmd(memcacheConstants.CMD_SET_TAP_PARAM, key, val)
+
+ def set_vbucket_state(self, vbucket, stateName):
+ assert isinstance(vbucket, int)
+ self.vbucketId = vbucket
+ state = struct.pack(memcacheConstants.VB_SET_PKT_FMT,
+ memcacheConstants.VB_STATE_NAMES[stateName])
+ return self._doCmd(memcacheConstants.CMD_SET_VBUCKET_STATE, '', '', state)
+
+ def get_vbucket_state(self, vbucket):
+ return self._doCmd(memcacheConstants.CMD_GET_VBUCKET_STATE,
+ str(vbucket), '')
+
+ def delete_vbucket(self, vbucket):
+ assert isinstance(vbucket, int)
+ self.vbucketId = vbucket
+ return self._doCmd(memcacheConstants.CMD_DELETE_VBUCKET, '', '')
+
+ def evict_key(self, key):
+ return self._doCmd(memcacheConstants.CMD_EVICT_KEY, key, '')
+
+ def getMulti(self, keys):
+ """Get values for any available keys in the given iterable.
+
+ Returns a dict of matched keys to their values."""
+ opaqued=dict(enumerate(keys))
+ terminal=len(opaqued)+10
+ # Send all of the keys in quiet
+ for k,v in opaqued.iteritems():
+ self._sendCmd(memcacheConstants.CMD_GETQ, v, '', k)
+
+ self._sendCmd(memcacheConstants.CMD_NOOP, '', '', terminal)
+
+ # Handle the response
+ rv={}
+ done=False
+ while not done:
+ opaque, cas, data=self._handleSingleResponse(None)
+ if opaque != terminal:
+ rv[opaqued[opaque]]=self.__parseGet((opaque, cas, data))
+ else:
+ done=True
+
+ return rv
+
+ def setMulti(self, exp, flags, items):
+ """Multi-set (using setq).
+
+ Give me (key, value) pairs."""
+
+ # If this is a dict, convert it to a pair generator
+ if hasattr(items, 'iteritems'):
+ items = items.iteritems()
+
+ opaqued=dict(enumerate(items))
+ terminal=len(opaqued)+10
+ extra=struct.pack(SET_PKT_FMT, flags, exp)
+
+ # Send all of the keys in quiet
+ for opaque,kv in opaqued.iteritems():
+ self._sendCmd(memcacheConstants.CMD_SETQ, kv[0], kv[1], opaque, extra)
+
+ self._sendCmd(memcacheConstants.CMD_NOOP, '', '', terminal)
+
+ # Handle the response
+ failed = []
+ done=False
+ while not done:
+ try:
+ opaque, cas, data = self._handleSingleResponse(None)
+ done = opaque == terminal
+ except MemcachedError, e:
+ failed.append(e)
+
+ return failed
+
+ def stats(self, sub=''):
+ """Get stats."""
+ opaque=self.r.randint(0, 2**32)
+ self._sendCmd(memcacheConstants.CMD_STAT, sub, '', opaque)
+ done = False
+ rv = {}
+ while not done:
+ cmd, opaque, cas, klen, extralen, data = self._handleKeyedResponse(None)
+ if klen:
+ rv[data[0:klen]] = data[klen:]
+ else:
+ done = True
+ return rv
+
+ def noop(self):
+ """Send a noop command."""
+ return self._doCmd(memcacheConstants.CMD_NOOP, '', '')
+
+ def delete(self, key, cas=0):
+ """Delete the value for a given key within the memcached server."""
+ return self._doCmd(memcacheConstants.CMD_DELETE, key, '', '', cas)
+
+ def flush(self, timebomb=0):
+ """Flush all storage in a memcached instance."""
+ return self._doCmd(memcacheConstants.CMD_FLUSH, '', '',
+ struct.pack(memcacheConstants.FLUSH_PKT_FMT, timebomb))
+
+ def bucket_select(self, name):
+ return self._doCmd(memcacheConstants.CMD_SELECT_BUCKET, name, '')
+
+ def sync_persistence(self, keyspecs):
+ payload = self._build_sync_payload(0x8, keyspecs)
+
+ print "sending sync for persistence command for the following keyspecs:", keyspecs
+ (opaque, cas, data) = self._doCmd(memcacheConstants.CMD_SYNC, "", payload)
+ return (opaque, cas, self._parse_sync_response(data))
+
+ def sync_mutation(self, keyspecs):
+ payload = self._build_sync_payload(0x4, keyspecs)
+
+ print "sending sync for mutation command for the following keyspecs:", keyspecs
+ (opaque, cas, data) = self._doCmd(memcacheConstants.CMD_SYNC, "", payload)
+ return (opaque, cas, self._parse_sync_response(data))
+
+ def sync_replication(self, keyspecs, numReplicas=1):
+ payload = self._build_sync_payload((numReplicas & 0x0f) << 4, keyspecs)
+
+ print "sending sync for replication command for the following keyspecs:", keyspecs
+ (opaque, cas, data) = self._doCmd(memcacheConstants.CMD_SYNC, "", payload)
+ return (opaque, cas, self._parse_sync_response(data))
+
+ def sync_replication_or_persistence(self, keyspecs, numReplicas=1):
+ payload = self._build_sync_payload(((numReplicas & 0x0f) << 4) | 0x8, keyspecs)
+
+ print "sending sync for replication or persistence command for the " \
+ "following keyspecs:", keyspecs
+ (opaque, cas, data) = self._doCmd(memcacheConstants.CMD_SYNC, "", payload)
+ return (opaque, cas, self._parse_sync_response(data))
+
+ def sync_replication_and_persistence(self, keyspecs, numReplicas=1):
+ payload = self._build_sync_payload(((numReplicas & 0x0f) << 4) | 0xA, keyspecs)
+
+ print "sending sync for replication and persistence command for the " \
+ "following keyspecs:", keyspecs
+ (opaque, cas, data) = self._doCmd(memcacheConstants.CMD_SYNC, "", payload)
+ return (opaque, cas, self._parse_sync_response(data))
+
+ def _build_sync_payload(self, flags, keyspecs):
+ payload = struct.pack(">I", flags)
+ payload += struct.pack(">H", len(keyspecs))
+
+ for spec in keyspecs:
+ if not isinstance(spec, dict):
+ raise TypeError("each keyspec must be a dict")
+ if not spec.has_key('vbucket'):
+ raise TypeError("missing vbucket property in keyspec")
+ if not spec.has_key('key'):
+ raise TypeError("missing key property in keyspec")
+
+ payload += struct.pack(">Q", spec.get('cas', 0))
+ payload += struct.pack(">H", spec['vbucket'])
+ payload += struct.pack(">H", len(spec['key']))
+ payload += spec['key']
+
+ return payload
+
+ def _parse_sync_response(self, data):
+ keyspecs = []
+ nkeys = struct.unpack(">H", data[0 : struct.calcsize("H")])[0]
+ offset = struct.calcsize("H")
+
+ for i in xrange(nkeys):
+ spec = {}
+ width = struct.calcsize("QHHB")
+ (spec['cas'], spec['vbucket'], keylen, eventid) = \
+ struct.unpack(">QHHB", data[offset : offset + width])
+ offset += width
+ spec['key'] = data[offset : offset + keylen]
+ offset += keylen
+
+ if eventid == memcacheConstants.CMD_SYNC_EVENT_PERSISTED:
+ spec['event'] = 'persisted'
+ elif eventid == memcacheConstants.CMD_SYNC_EVENT_MODIFED:
+ spec['event'] = 'modified'
+ elif eventid == memcacheConstants.CMD_SYNC_EVENT_DELETED:
+ spec['event'] = 'deleted'
+ elif eventid == memcacheConstants.CMD_SYNC_EVENT_REPLICATED:
+ spec['event'] = 'replicated'
+ elif eventid == memcacheConstants.CMD_SYNC_INVALID_KEY:
+ spec['event'] = 'invalid key'
+ elif spec['event'] == memcacheConstants.CMD_SYNC_INVALID_CAS:
+ spec['event'] = 'invalid cas'
+ else:
+ spec['event'] = eventid
+
+ keyspecs.append(spec)
+
+ return keyspecs
+
+ def restore_file(self, filename):
+ """Initiate restore of a given file."""
+ return self._doCmd(memcacheConstants.CMD_RESTORE_FILE, filename, '', '', 0)
+
+ def restore_complete(self):
+ """Notify the server that we're done restoring."""
+ return self._doCmd(memcacheConstants.CMD_RESTORE_COMPLETE, '', '', '', 0)
+
+ def deregister_tap_client(self, tap_name):
+ """Deregister the TAP client with a given name."""
+ return self._doCmd(memcacheConstants.CMD_DEREGISTER_TAP_CLIENT, tap_name, '', '', 0)
+
+ def reset_replication_chain(self):
+ """Reset the replication chain."""
+ return self._doCmd(memcacheConstants.CMD_RESET_REPLICATION_CHAIN, '', '', '', 0)
View
194 memcacheConstants.py
@@ -0,0 +1,194 @@
+#!/usr/bin/env python
+"""
+
+Copyright (c) 2007 Dustin Sallings <dustin@spy.net>
+"""
+
+import struct
+
+# Command constants
+CMD_GET = 0
+CMD_SET = 1
+CMD_SETQ = 0x11
+CMD_ADD = 2
+CMD_REPLACE = 3
+CMD_DELETE = 4
+CMD_INCR = 5
+CMD_DECR = 6
+CMD_QUIT = 7
+CMD_FLUSH = 8
+CMD_GETQ = 9
+CMD_NOOP = 10
+CMD_VERSION = 11
+CMD_STAT = 0x10
+CMD_APPEND = 0x0e
+CMD_PREPEND = 0x0f
+CMD_TOUCH = 0x1c
+CMD_GAT = 0x1d
+
+# SASL stuff
+CMD_SASL_LIST_MECHS = 0x20
+CMD_SASL_AUTH = 0x21
+CMD_SASL_STEP = 0x22
+
+# Bucket extension
+CMD_CREATE_BUCKET = 0x85
+CMD_DELETE_BUCKET = 0x86
+CMD_LIST_BUCKETS = 0x87
+CMD_EXPAND_BUCKET = 0x88
+CMD_SELECT_BUCKET = 0x89
+
+CMD_STOP_PERSISTENCE = 0x80
+CMD_START_PERSISTENCE = 0x81
+CMD_SET_FLUSH_PARAM = 0x82
+
+CMD_START_REPLICATION = 0x90
+CMD_STOP_REPLICATION = 0x91
+CMD_SET_TAP_PARAM = 0x92
+CMD_EVICT_KEY = 0x93
+
+CMD_RESTORE_FILE = 0x98
+CMD_RESTORE_ABORT = 0x99
+CMD_RESTORE_COMPLETE = 0x9a
+
+#Online update
+CMD_START_ONLINEUPDATE = 0x9b
+CMD_COMPLETE_ONLINEUPDATE = 0x9c
+CMD_REVERT_ONLINEUPDATE = 0x9d
+
+# TAP client registration
+CMD_DEREGISTER_TAP_CLIENT = 0x9e
+
+# Reset replication chain
+CMD_RESET_REPLICATION_CHAIN = 0x9f
+
+# Replication
+CMD_TAP_CONNECT = 0x40
+CMD_TAP_MUTATION = 0x41
+CMD_TAP_DELETE = 0x42
+CMD_TAP_FLUSH = 0x43
+CMD_TAP_OPAQUE = 0x44
+CMD_TAP_VBUCKET_SET = 0x45
+CMD_TAP_CHECKPOINT_START = 0x46
+CMD_TAP_CHECKPOINT_END = 0x47
+
+# vbucket stuff
+CMD_SET_VBUCKET_STATE = 0x3d
+CMD_GET_VBUCKET_STATE = 0x3e
+CMD_DELETE_VBUCKET = 0x3f
+
+CMD_GET_LOCKED = 0x94
+
+CMD_SYNC = 0x96
+
+# event IDs for the SYNC command responses
+CMD_SYNC_EVENT_PERSISTED = 1
+CMD_SYNC_EVENT_MODIFED = 2
+CMD_SYNC_EVENT_DELETED = 3
+CMD_SYNC_EVENT_REPLICATED = 4
+CMD_SYNC_INVALID_KEY = 5
+CMD_SYNC_INVALID_CAS = 6
+
+VB_STATE_ACTIVE=1
+VB_STATE_REPLICA=2
+VB_STATE_PENDING=3
+VB_STATE_DEAD=4
+VB_STATE_NAMES={'active': VB_STATE_ACTIVE,
+ 'replica': VB_STATE_REPLICA,
+ 'pending': VB_STATE_PENDING,
+ 'dead': VB_STATE_DEAD}
+
+COMMAND_NAMES = dict(((globals()[k], k) for k in globals() if k.startswith("CMD_")))
+
+# TAP_OPAQUE types
+TAP_OPAQUE_ENABLE_AUTO_NACK = 0
+TAP_OPAQUE_INITIAL_VBUCKET_STREAM = 1
+TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC = 2
+TAP_OPAQUE_OPEN_CHECKPOINT = 3
+
+# TAP connect flags
+TAP_FLAG_BACKFILL = 0x01
+TAP_FLAG_DUMP = 0x02
+TAP_FLAG_LIST_VBUCKETS = 0x04
+TAP_FLAG_TAKEOVER_VBUCKETS = 0x08
+TAP_FLAG_SUPPORT_ACK = 0x10
+TAP_FLAG_REQUEST_KEYS_ONLY = 0x20
+TAP_FLAG_CHECKPOINT = 0x40
+TAP_FLAG_REGISTERED_CLIENT = 0x80
+
+TAP_FLAG_TYPES = {TAP_FLAG_BACKFILL: ">Q",
+ TAP_FLAG_REGISTERED_CLIENT: ">B"}
+
+# TAP per-message flags
+TAP_FLAG_ACK = 0x01
+TAP_FLAG_NO_VALUE = 0x02 # The value for the key is not included in the packet
+
+# Flags, expiration
+SET_PKT_FMT=">II"
+
+# flags
+GET_RES_FMT=">I"
+
+# How long until the deletion takes effect.
+DEL_PKT_FMT=""
+
+## TAP stuff
+# eng-specific length, flags, ttl, [res, res, res]; item flags, exp
+TAP_MUTATION_PKT_FMT = ">HHbxxxII"
+TAP_GENERAL_PKT_FMT = ">HHbxxx"
+
+# amount, initial value, expiration
+INCRDECR_PKT_FMT=">QQI"
+# Special incr expiration that means do not store
+INCRDECR_SPECIAL=0xffffffff
+INCRDECR_RES_FMT=">Q"
+
+# Time bomb
+FLUSH_PKT_FMT=">I"
+
+# Touch commands
+# expiration
+TOUCH_PKT_FMT=">I"
+GAT_PKT_FMT=">I"
+GETL_PKT_FMT=">I"
+
+# 2 bit integer. :/
+VB_SET_PKT_FMT=">I"
+
+MAGIC_BYTE = 0x80
+REQ_MAGIC_BYTE = 0x80
+RES_MAGIC_BYTE = 0x81
+
+# magic, opcode, keylen, extralen, datatype, vbucket, bodylen, opaque, cas
+REQ_PKT_FMT=">BBHBBHIIQ"
+# magic, opcode, keylen, extralen, datatype, status, bodylen, opaque, cas
+RES_PKT_FMT=">BBHBBHIIQ"
+# min recv packet size
+MIN_RECV_PACKET = struct.calcsize(REQ_PKT_FMT)
+# The header sizes don't deviate
+assert struct.calcsize(REQ_PKT_FMT) == struct.calcsize(RES_PKT_FMT)
+
+EXTRA_HDR_FMTS={
+ CMD_SET: SET_PKT_FMT,
+ CMD_ADD: SET_PKT_FMT,
+ CMD_REPLACE: SET_PKT_FMT,
+ CMD_INCR: INCRDECR_PKT_FMT,
+ CMD_DECR: INCRDECR_PKT_FMT,
+ CMD_DELETE: DEL_PKT_FMT,
+ CMD_FLUSH: FLUSH_PKT_FMT,
+ CMD_TAP_MUTATION: TAP_MUTATION_PKT_FMT,
+ CMD_TAP_DELETE: TAP_GENERAL_PKT_FMT,
+ CMD_TAP_FLUSH: TAP_GENERAL_PKT_FMT,
+ CMD_TAP_OPAQUE: TAP_GENERAL_PKT_FMT,
+ CMD_TAP_VBUCKET_SET: TAP_GENERAL_PKT_FMT,
+ CMD_SET_VBUCKET_STATE: VB_SET_PKT_FMT,
+}
+
+EXTRA_HDR_SIZES=dict(
+ [(k, struct.calcsize(v)) for (k,v) in EXTRA_HDR_FMTS.items()])
+
+ERR_UNKNOWN_CMD = 0x81
+ERR_NOT_FOUND = 0x1
+ERR_EXISTS = 0x2
+ERR_AUTH = 0x20
+ERR_AUTH_CONTINUE = 0x21
View
453 node.py
@@ -0,0 +1,453 @@
+"""
+ Implementation for rebalance, add, remove, stop rebalance.
+"""
+
+import time
+import os
+import sys
+import util_cli as util
+import socket
+
+from usage import usage
+from restclient import *
+from listservers import *
+
+# the rest commands and associated URIs for various node operations
+
+rest_cmds = {
+ 'rebalance' :'/controller/rebalance',
+ 'rebalance-stop' :'/controller/stopRebalance',
+ 'rebalance-status' :'/pools/default/rebalanceProgress',
+ 'server-add' :'/controller/addNode',
+ 'server-readd' :'/controller/reAddNode',
+ 'failover' :'/controller/failOver',
+ 'cluster-init' :'/settings/web',
+ 'node-init' :'/nodes/self/controller/settings',
+}
+
+server_no_remove = [
+ 'rebalance-stop',
+ 'rebalance-status',
+ 'server-add',
+ 'server-readd',
+ 'failover'
+]
+server_no_add = [
+ 'rebalance-stop',
+ 'rebalance-status',
+ 'failover',
+]
+
+# Map of operations and the HTTP methods used against the REST interface
+
+methods = {
+ 'rebalance' :'POST',
+ 'rebalance-stop' :'POST',
+ 'rebalance-status' :'GET',
+ 'eject-server' :'POST',
+ 'server-add' :'POST',
+ 'server-readd' :'POST',
+ 'failover' :'POST',
+ 'cluster-init' :'POST',
+ 'node-init' :'POST',
+}
+
+# Map of HTTP success code, success message and error message for
+# handling HTTP response properly
+
+class Node:
+ def __init__(self):
+ self.rest_cmd = rest_cmds['rebalance-status']
+ self.method = 'GET'
+ self.debug = False
+ self.server = ''
+ self.port = ''
+ self.user = ''
+ self.password = ''
+ self.params = {}
+ self.output = 'standard'
+ self.password_new = None
+ self.username_new = None
+ self.port_new = None
+ self.per_node_quota = None
+ self.data_path = None
+
+ def runCmd(self, cmd, server, port,
+ user, password, opts):
+ self.rest_cmd = rest_cmds[cmd]
+ self.method = methods[cmd]
+ self.server = server
+ self.port = int(port)
+ self.user = user
+ self.password = password
+
+ servers = self.processOpts(cmd, opts)
+
+ if self.debug:
+ print "INFO: servers %s" % servers
+
+ if cmd == 'server-add' and not servers['add']:
+ usage("please list one or more --server-add=HOST[:PORT];"
+ " or use -h for more help.")
+
+ if cmd == 'server-readd' and not servers['add']:
+ usage("please list one or more --server-add=HOST[:PORT];"
+ " or use -h for more help.")
+
+ if cmd in ('server-add', 'rebalance'):
+ self.addServers(servers['add'])
+ if cmd == 'rebalance':
+ self.rebalance(servers)
+
+ if cmd == 'server-readd':
+ self.reAddServers(servers)
+
+ if cmd == 'rebalance-status':
+ output_result = self.rebalanceStatus()
+ print output_result
+
+ if cmd == 'rebalance-stop':
+ output_result = self.rebalanceStop()
+ print output_result
+
+ if cmd == 'failover':
+ if len(servers['failover']) <= 0:
+ usage("please list one or more --server-failover=HOST[:PORT];"
+ " or use -h for more help.")
+
+ self.failover(servers)
+
+ if cmd == 'cluster-init':
+ self.clusterInit()
+
+ if cmd == 'node-init':
+ self.nodeInit()
+
+
+ def clusterInit(self):
+ rest = restclient.RestClient(self.server,
+ self.port,
+ {'debug':self.debug})
+ if self.port_new:
+ rest.setParam('port', self.port_new)
+ else:
+ rest.setParam('port', 'SAME')
+ rest.setParam('initStatus', 'done')
+ if self.username_new:
+ rest.setParam('username', self.username_new)
+ else:
+ rest.setParam('username', self.user)
+ if self.password_new:
+ rest.setParam('password', self.password_new)
+ else:
+ rest.setParam('password', self.password)
+
+ opts = {}
+ opts['error_msg'] = "unable to init %s" % self.server
+ opts['success_msg'] = "init %s" % self.server
+
+ output_result = rest.restCmd(self.method,
+ self.rest_cmd,
+ self.user,
+ self.password,
+ opts)
+ print output_result
+
+ # per node quota unfortunately runs against a different location
+ if not self.per_node_quota:
+ return
+
+ if self.port_new:
+ self.port = int(self.port_new)
+ if self.username_new:
+ self.user = self.username_new
+ if self.password_new:
+ self.password = self.password_new
+
+ rest = restclient.RestClient(self.server,
+ self.port,
+ {'debug':self.debug})
+ if self.per_node_quota:
+ rest.setParam('memoryQuota', self.per_node_quota)
+
+ output_result = rest.restCmd(self.method,
+ '/pools/default',
+ self.user,
+ self.password,
+ opts)
+ print output_result
+
+
+ def nodeInit(self):
+ rest = restclient.RestClient(self.server,
+ self.port,
+ {'debug':self.debug})
+ if self.data_path:
+ rest.setParam('path', self.data_path)
+
+ opts = {}
+ opts['error_msg'] = "unable to init %s" % self.server
+ opts['success_msg'] = "init %s" % self.server
+
+ output_result = rest.restCmd(self.method,
+ self.rest_cmd,
+ self.user,
+ self.password,
+ opts)
+ print output_result
+
+
+ def processOpts(self, cmd, opts):
+ """ Set standard opts.
+ note: use of a server key keeps optional
+ args aligned with server.
+ """
+ servers = {
+ 'add': {},
+ 'remove': {},
+ 'failover': {}
+ }
+
+ # don't allow options that don't correspond to given commands
+
+ for o, a in opts:
+ usage_msg = "option '%s' is not used with command '%s'" % (o, cmd)
+
+ if o in ( "-r", "--server-remove"):
+ if cmd in server_no_remove:
+ usage(usage_msg)
+ elif o in ( "-a", "--server-add",
+ "--server-add-username",
+ "--server-add-password"):
+ if cmd in server_no_add:
+ usage(usage_msg)
+
+ server = None
+
+ for o, a in opts:
+ if o in ("-a", "--server-add"):
+ if a == "self":
+ a = socket.gethostbyname(socket.getfqdn())
+ server = "%s:%d" % util.hostport(a)
+ servers['add'][server] = { 'user':'', 'password':''}
+ elif o == "--server-add-username":
+ if server is None:
+ usage("please specify --server-add"
+ " before --server-add-username")
+ servers['add'][server]['user'] = a
+ elif o == "--server-add-password":
+ if server is None:
+ usage("please specify --server-add"
+ " before --server-add-password")
+ servers['add'][server]['password'] = a
+ elif o in ( "-r", "--server-remove"):
+ server = "%s:%d" % util.hostport(a)
+ servers['remove'][server] = True
+ server = None
+ elif o in ( "--server-failover"):
+ server = "%s:%d" % util.hostport(a)
+ servers['failover'][server] = True
+ server = None
+ elif o in ('-o', '--output'):
+ if a == 'json':
+ self.output = a
+ server = None
+ elif o in ('-d', '--debug'):
+ self.debug = True
+ server = None
+ elif o == '--cluster-init-password':
+ self.password_new = a
+ elif o == '--cluster-init-username':
+ self.username_new = a
+ elif o == '--cluster-init-port':
+ self.port_new = a
+ elif o == '--cluster-init-ramsize':
+ self.per_node_quota = a
+ elif o == '--node-init-data-path':
+ self.data_path = a
+
+ return servers
+
+ def addServers(self, servers):
+ for server in servers:
+ user = servers[server]['user']
+ password = servers[server]['password']
+ output_result = self.serverAdd(server,
+ user,
+ password)
+ print output_result
+
+ def serverAdd(self, add_server, add_with_user, add_with_password):
+ rest = restclient.RestClient(self.server,
+ self.port,
+ {'debug':self.debug})
+ rest.setParam('hostname', add_server)
+ if add_with_user and add_with_password:
+ rest.setParam('user', add_with_user)
+ rest.setParam('password', add_with_password)
+
+ opts = {}
+ opts['error_msg'] = "unable to server-add %s" % add_server
+ opts['success_msg'] = "server-add %s" % add_server
+
+ output_result = rest.restCmd('POST',
+ rest_cmds['server-add'],
+ self.user,
+ self.password,
+ opts)
+ return output_result
+
+ def reAddServers(self, servers):
+ known_otps, eject_otps, failover_otps, readd_otps = \
+ self.getNodeOtps(to_readd=servers['add'])
+
+ for readd_otp in readd_otps:
+ rest = restclient.RestClient(self.server,
+ self.port,
+ {'debug':self.debug})
+ rest.setParam('otpNode', readd_otp)
+
+ opts = {}
+ opts['error_msg'] = "unable to re-add %s" % readd_otp
+ opts['success_msg'] = "re-add %s" % readd_otp
+
+ output_result = rest.restCmd('POST',
+ rest_cmds['server-readd'],
+ self.user,
+ self.password,
+ opts)
+ print output_result
+
+ def getNodeOtps(self, to_eject=[], to_failover=[], to_readd=[]):
+ """ Convert known nodes into otp node id's.
+ """
+ listservers = ListServers()
+ known_nodes_list = listservers.getNodes(
+ listservers.getData(self.server,
+ self.port,
+ self.user,
+ self.password))
+ known_otps = []
+ eject_otps = []
+ failover_otps = []
+ readd_otps = []
+
+ for node in known_nodes_list:
+ known_otps.append(node['otpNode'])
+ if node['hostname'] in to_eject:
+ eject_otps.append(node['otpNode'])
+ if node['hostname'] in to_failover:
+ failover_otps.append(node['otpNode'])
+ if node['hostname'] in to_readd:
+ readd_otps.append(node['otpNode'])
+
+ return (known_otps, eject_otps, failover_otps, readd_otps)
+
+ def rebalance(self, servers):
+ known_otps, eject_otps, failover_otps, readd_otps = \
+ self.getNodeOtps(to_eject=servers['remove'])
+
+ rest = restclient.RestClient(self.server,
+ self.port,
+ {'debug':self.debug})
+ rest.setParam('knownNodes', ','.join(known_otps))
+ rest.setParam('ejectedNodes', ','.join(eject_otps))
+
+ opts = {}
+ opts['success_msg'] = 'rebalanced cluster'
+ opts['error_msg'] = 'unable to rebalance cluster'
+
+ output_result = rest.restCmd('POST',
+ rest_cmds['rebalance'],
+ self.user,
+ self.password,
+ opts)
+ if self.debug:
+ print "INFO: rebalance started: %s" % output_result
+
+ sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
+
+ print "INFO: rebalancing",
+
+ status, error = self.rebalanceStatus(prefix='\n')
+ while status == 'running':
+ print ".",
+ time.sleep(0.5)
+ try:
+ status, error = self.rebalanceStatus(prefix='\n')
+ except socket.error:
+ time.sleep(2)
+ status, error = self.rebalanceStatus(prefix='\n')
+
+ if error:
+ print '\n' + error
+ sys.exit(1)
+ else:
+ print '\n' + output_result
+
+ def rebalanceStatus(self, prefix=''):
+ rest = restclient.RestClient(self.server,
+ self.port,
+ {'debug':self.debug})
+ opts = { 'error_msg':'unable to obtain rebalance status'}
+
+ output_result = rest.restCmd('GET',
+ rest_cmds['rebalance-status'],
+ self.user,
+ self.password,
+ opts)
+
+ json = rest.getJson(output_result)
+ if type(json) == type(list()):
+ print prefix + ("ERROR: %s" % json[0])
+ sys.exit(1)
+
+ if 'errorMessage' in json:
+ error_message = json['errorMessage']
+ else:
+ error_message = None
+
+ return json['status'],error_message
+
+ def rebalanceStop(self):
+ rest = restclient.RestClient(self.server,
+ self.port,
+ {'debug':self.debug})
+
+ opts = {}
+ opts['success_msg'] = 'rebalance cluster stopped'
+ opts['error_msg'] = 'unable to stop rebalance'
+
+ output_result = rest.restCmd('POST',
+ rest_cmds['rebalance-stop'],
+ self.user,
+ self.password,
+ opts)
+ return output_result
+
+
+ def failover(self, servers):
+ known_otps, eject_otps, failover_otps, readd_otps = \
+ self.getNodeOtps(to_failover=servers['failover'])
+
+ if len(failover_otps) <= 0:
+ usage("specified servers are not part of the cluster: %s" %
+ servers['failover'].keys())
+
+ for failover_otp in failover_otps:
+ rest = restclient.RestClient(self.server,
+ self.port,
+ {'debug':self.debug})
+ rest.setParam('otpNode', failover_otp)
+
+ opts = {}
+ opts['error_msg'] = "unable to failover %s" % failover_otp
+ opts['success_msg'] = "failover %s" % failover_otp
+
+ output_result = rest.restCmd('POST',
+ rest_cmds['failover'],
+ self.user,
+ self.password,
+ opts)
+ print output_result
+
View
163 restclient.py
@@ -0,0 +1,163 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+ methods for contacting to a HTTP server, sending REST commands
+ and processing the JSON response
+"""
+
+import sys
+import socket
+import httplib
+import urllib
+import base64
+import simplejson as json
+import string
+
+from StringIO import StringIO
+
+class RestClient:
+ def __init__(self, server, port, opts= {}):
+ self.server = server
+ self.port = port
+ self.debug = opts.get('debug', False)
+ self.uri = '/pools'
+ self.method = 'GET'
+ self.params = {}
+ self.user = ''
+ self.password = ''
+ self.clientConnect(server, int(port))
+
+ def clientConnect(self, server, port):
+ error_connect = "Unable to connect to %s" % self.server
+ try:
+ self.conn = httplib.HTTPConnection(server, port)
+ except httplib.NotConnected:
+ print error_connect
+ sys.exit(2)
+ except httplib.HTTPException:
+ print error_connect
+ sys.exit(2)
+ except socket.error:
+ print error_connect
+ sys.exit(2)
+ except socket.gaierror:
+ print error_connect
+ sys.exit(2)
+
+ def setParam(self, param, value):
+ self.params[param] = value
+
+ def handleResponse(self,
+ method,
+ response,
+ opts={ 'success_msg':'',
+ 'error_msg':'' }):
+ """ parse response in standard way.
+ """
+ if response.status in [200, 201, 202, 204, 302]:
+ if method == 'GET':
+ return response.read()
+
+ return "SUCCESS: %s" % opts['success_msg']
+
+ if response.status == 401:
+ print 'ERROR: unable to access the REST API - please check your username (-u) and password (-p)'
+ sys.exit(2)
+
+ print 'ERROR: %s (%d) %s' % (opts['error_msg'],
+ response.status, response.reason)
+
+ output_json = json.loads(response.read())
+ print output_json
+ if "errors" in output_json:
+ for error_code,error_message in output_json["errors"].iteritems():
+ print "ERROR: %s" % error_message
+
+ sys.exit(2)
+
+ def bootStrap(self, headers):
+ """ First REST call needed for info for later REST calls.
+ """
+ self.conn.request('GET', '/pools', '', headers)
+ response = self.conn.getresponse()
+
+ opts = {'error_msg':'bootstrap failed'}
+ return self.handleResponse('GET', response, opts)
+
+ def sendCmd(self, method, uri,
+ user='', password='', opts = {}):
+ """
+ sendCmd()
+ This method handles accessing the REST API and returning
+ either data, if a GET, or a success or error message if a POST
+ """
+ data = ''
+ headers = {}
+ encoded_params = ''
+
+ if user and password:
+ self.user = user
+ self.password = password
+
+ auth = ('Basic ' +
+ string.strip(base64.encodestring(user + ':' + password)))
+
+ headers['Authorization'] = auth
+
+ self.bootStrap(headers)
+
+ if method == 'POST':
+ encoded_params = urllib.urlencode(self.params)
+ headers['Content-type'] = 'application/x-www-form-urlencoded'
+ elif method == 'DELETE':
+ encoded_params = urllib.urlencode(self.params)
+ headers['Content-type'] = 'application/x-www-form-urlencoded'
+ else:
+ if self.params:
+ uri = uri, '?', urllib.urlencode(self.params)
+
+ if self.debug:
+ print "METHOD: %s" % method
+ print "PARAMS: ", self.params
+ print "ENCODED_PARAMS: %s" % encoded_params
+ print "REST CMD: %s %s" % (method,uri)
+
+ self.makeRequest(method, uri, encoded_params, headers)
+
+ response = self.conn.getresponse()
+ if self.debug:
+ print "response.status: %s" % response.status
+ return response
+
+ def makeRequest(self, method, uri, encoded_params, headers):
+ error_connect = "ERROR: unable to connect to %s:%d" % (self.server, self.port)
+ try:
+ self.conn.request(method, uri, encoded_params, headers)
+ except httplib.NotConnected:
+ print error_connect
+ sys.exit(2)
+ except httplib.HTTPException:
+ print error_connect
+ sys.exit(2)
+ except socket.error:
+ print error_connect
+ sys.exit(2)
+ except socket.gaierror:
+ print error_connect
+ sys.exit(2)
+
+ def getJson(self, data):
+ return json.loads(data)
+
+ def jsonMessage(self, data):
+ return json.JSONEncoder().encode(data)
+
+ def restCmd(self, method, uri, user='', password='', opts={}):
+ if method == None:
+ method = 'GET'
+
+ response = self.sendCmd(method, uri,
+ user, password, opts)
+
+ return self.handleResponse(method, response, opts)
View
162 usage.py
@@ -0,0 +1,162 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import sys
+
+def commands_usage():
+ return """
+ server-list list all servers in a cluster
+ server-info show details on one server
+ server-add add one or more servers to the cluster
+ server-readd readd a server that was failed over
+ rebalance start a cluster rebalancing
+ rebalance-stop stop current cluster rebalancing
+ rebalance-status show status of current cluster rebalancing
+ failover failover one or more servers
+ cluster-init set the username,password and port of the cluster
+ node-init set node specific parameters
+ bucket-list list all buckets in a cluster
+ bucket-create add a new bucket to the cluster
+ bucket-edit modify an existing bucket
+ bucket-delete delete an existing bucket
+ bucket-flush flush a given bucket
+ help show longer usage/help and examples
+"""
+
+def short_usage():
+ print "usage: couchbase-cli COMMAND CLUSTER [OPTIONS]"
+ print ""
+ print "CLUSTER is --cluster=HOST[:PORT] or -c HOST[:PORT]"
+ print ""
+ print "COMMANDs include" + commands_usage()
+
+ sys.exit(2)
+
+def usage(error_msg=''):
+ if error_msg:
+ print "ERROR: %s" % error_msg
+ sys.exit(2)
+
+ print """couchbase-cli - command-line cluster administration tool
+
+usage: couchbase-cli COMMAND CLUSTER [OPTIONS]
+
+COMMAND:""" + commands_usage() + """
+CLUSTER:
+ --cluster=HOST[:PORT] or -c HOST[:PORT]
+
+OPTIONS:
+ -u USERNAME, --user=USERNAME admin username of the cluster
+ -p PASSWORD, --password=PASSWORD admin password of the cluster
+ -o KIND, --output=KIND KIND is json or standard
+ -d, --debug
+
+server-add OPTIONS:
+ --server-add=HOST[:PORT] server to be added
+ --server-add-username=USERNAME admin username for the
+ server to be added
+ --server-add-password=PASSWORD admin password for the
+ server to be added
+
+server-readd OPTIONS:
+ --server-add=HOST[:PORT] server to be added
+ --server-add-username=USERNAME admin username for the
+ server to be added
+ --server-add-password=PASSWORD admin password for the
+ server to be added
+
+rebalance OPTIONS:
+ --server-add* see server-add OPTIONS
+ --server-remove=HOST[:PORT] the server to be removed
+
+failover OPTIONS:
+ --server-failover=HOST[:PORT] server to failover
+
+cluster-init OPTIONS:
+ --cluster-init-username=USER new admin username
+ --cluster-init-password=PASSWORD new admin password
+ --cluster-init-port=PORT new cluster REST/http port
+ --cluster-init-ramsize=RAMSIZEMB per node ram quota in MB
+
+node-init OPTIONS:
+ --node-init-data-path=PATH per node path to store data
+
+bucket-* OPTIONS:
+ --bucket=BUCKETNAME bucket to act on
+ --bucket-type=TYPE memcached or membase
+ --bucket-port=PORT supports ASCII protocol and is auth-less
+ --bucket-password=PASSWORD standard port, exclusive with bucket-port
+ --bucket-ramsize=RAMSIZEMB ram quota in MB
+ --bucket-replica=COUNT replication count
+
+The default PORT number is 8091.
+
+EXAMPLES:
+ List servers in a cluster:
+ couchbase-cli server-list -c 192.168.0.1:8091
+
+ Server information:
+ couchbase-cli server-info -c 192.168.0.1:8091
+
+ Add a node to a cluster, but do not rebalance:
+ couchbase-cli server-add -c 192.168.0.1:8091 \\
+ --server-add=192.168.0.2:8091
+
+ Add a node to a cluster and rebalance:
+ couchbase-cli rebalance -c 192.168.0.1:8091 \\
+ --server-add=192.168.0.2:8091
+
+ Remove a node from a cluster and rebalance:
+ couchbase-cli rebalance -c 192.168.0.1:8091 \\
+ --server-remove=192.168.0.2:8091
+
+ Remove and add nodes from/to a cluster and rebalance:
+ couchbase-cli rebalance -c 192.168.0.1:8091 \\
+ --server-remove=192.168.0.2 \\
+ --server-add=192.168.0.4
+
+ Stop the current rebalancing:
+ couchbase-cli rebalance-stop -c 192.168.0.1:8091
+
+ Change the username, password, port and ram quota:
+ couchbase-cli cluster-init -c 192.168.0.1:8091 \\
+ --cluster-init-username=Administrator \\
+ --cluster-init-password=password \\
+ --cluster-init-port=8080 \\
+ --cluster-init-ramsize=300
+
+ Change the data path:
+ couchbase-cli node-init -c 192.168.0.1:8091 \\
+ --node-init-data-path=/tmp
+
+ List buckets in a cluster:
+ couchbase-cli bucket-list -c 192.168.0.1:8091
+
+ Create a new dedicated port membase bucket:
+ couchbase-cli bucket-create -c 192.168.0.1:8091 \\
+ --bucket=test_bucket \\
+ --bucket-type=membase \\
+ --bucket-port=11222 \\
+ --bucket-ramsize=200 \\
+ --bucket-replica=1
+
+ Create a new sasl memcached bucket:
+ couchbase-cli bucket-create -c 192.168.0.1:8091 \\
+ --bucket=test_bucket \\
+ --bucket-type=memcached \\
+ --bucket-password=password \\
+ --bucket-ramsize=200
+
+ Modify a dedicated port bucket:
+ couchbase-cli bucket-edit -c 192.168.0.1:8091 \\
+ --bucket=test_bucket \\
+ --bucket-port=11222 \\
+ --bucket-ramsize=400
+
+ Delete a bucket:
+ couchbase-cli bucket-delete -c 192.168.0.1:8091 \\
+ --bucket=test_bucket
+
+"""
+
+ sys.exit(2)
View
13 util_cli.py
@@ -0,0 +1,13 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+def hostport(hoststring, default_port=8091):
+ """ finds the host and port given a host:port string """
+ try:
+ host, port = hoststring.split(':')
+ port = int(port)
+ except ValueError:
+ host = hoststring
+ port = default_port
+