Permalink
Browse files

Fixed all PEP8 errors

Change-Id: Icd01d8ba68394993be81e3c2efcaed6a593bc1f6
Reviewed-on: http://review.couchbase.org/16344
Reviewed-by: Volker Mische <volker.mische@gmail.com>
Tested-by: Michael Wiederhold <mike@couchbase.com>
  • Loading branch information...
1 parent 425e15e commit d94edbbb20b2c7605a34ac29939519360927876c @mikewied mikewied committed with mikewied May 23, 2012
View
75 couchbase/client.py
@@ -17,15 +17,14 @@
import uuid
try:
- import json
+ import json
except:
- import simplejson as json
+ import simplejson as json
import time
from copy import deepcopy
from threading import Thread, Lock
import urllib
import warnings
-
import logging
from rest_client import RestConnection
@@ -41,10 +40,10 @@ def __init__(self, host, username, password):
else:
[ip, port] = host, 8091
- server = {'ip':ip,
- 'port':port,
- 'username':username,
- 'password':password
+ server = {'ip': ip,
+ 'port': port,
+ 'username': username,
+ 'password': password
}
self.servers = [server]
@@ -53,26 +52,31 @@ def __init__(self, host, username, password):
self.rest_username = username
self.rest_password = password
- server_config_uri = "http://%s:%s/pools/default" % (server['ip'], server['port'])
- config = ServerHelper.parse_server_config(server_config_uri, username, password)
+ server_config_uri = "http://%s:%s/pools/default" % (server['ip'],
+ server['port'])
+ config = ServerHelper.parse_server_config(server_config_uri, username,
+ password)
#couchApiBase will not be in node config before Couchbase Server 2.0
self.couch_api_base = config["nodes"][0].get("couchApiBase")
- self.streaming_thread = Thread(name="streaming", target=self._start_streaming, args=())
+ self.streaming_thread = Thread(name="streaming",
+ target=self._start_streaming, args=())
self.streaming_thread.daemon = True
self.streaming_thread.start()
def _start_streaming(self):
# this will dynamically update servers
urlopener = urllib.FancyURLopener()
- urlopener.prompt_user_passwd = lambda host, realm: (self.rest_username, self.rest_password)
+ urlopener.prompt_user_passwd = lambda: (self.rest_username,
+ self.rest_password)
current_servers = True
while current_servers:
self.servers_lock.acquire()
current_servers = deepcopy(self.servers)
self.servers_lock.release()
for server in current_servers:
- url = "http://%s:%s/poolsStreaming/default" % (server["ip"], server["port"])
+ url = "http://%s:%s/poolsStreaming/default" % (server["ip"],
+ server["port"])
f = urlopener.open(url)
while f:
try:
@@ -93,14 +97,15 @@ def _start_streaming(self):
new_servers = []
nodes = data["nodes"]
for node in nodes:
- if node["clusterMembership"] == "active" and node["status"] == "healthy":
- hostport = node["hostname"]
+ if (node["clusterMembership"] == "active" and
+ node["status"] == "healthy"):
+ ip, port = node["hostname"].split(":")
couch_api_base = node.get("couchApiBase")
- new_servers.append({"ip":hostport.split(":")[0],
- "port":int(hostport.split(":")[1]),
- "username":self.rest_username,
- "password":self.rest_password,
- "couchApiBase" : couch_api_base
+ new_servers.append({"ip": ip,
+ "port": port,
+ "username": self.rest_username,
+ "password": self.rest_password,
+ "couchApiBase": couch_api_base
})
new_servers.sort()
self.servers_lock.acquire()
@@ -118,7 +123,8 @@ def buckets(self):
buckets.append(Bucket(rest_bucket.name, self))
return buckets
- def create(self, bucket_name, bucket_password='', ram_quota_mb=100, replica=0):
+ def create(self, bucket_name, bucket_password='', ram_quota_mb=100,
+ replica=0):
rest = self._rest()
rest.create_bucket(bucket=bucket_name,
ramQuotaMB=ram_quota_mb,
@@ -131,7 +137,11 @@ def create(self, bucket_name, bucket_password='', ram_quota_mb=100, replica=0):
while True:
try:
content = '{"basicStats":{"quotaPercentUsed":0.0}}'
- status, content = rest._http_request("http://%s:%s/pools/default/buckets/%s" % (ip, port, bucket_name), method='GET', params='', headers=None, timeout=120)
+ formatter_uri = "http://%s:%s/pools/default/buckets/%s"
+ status, content = rest._http_request(formatter_uri %
+ (ip, port, bucket_name),
+ method='GET', params='',
+ headers=None, timeout=120)
except ValueError:
pass
if json.loads(content)['basicStats']['quotaPercentUsed'] > 0.0:
@@ -165,12 +175,14 @@ def _rest_info(self):
self.servers_lock.acquire()
server_info = deepcopy(self.servers[0])
self.servers_lock.release()
- return server_info['ip'], server_info['port'], server_info['username'], server_info['password']
+ return (server_info['ip'], server_info['port'],
+ server_info['username'], server_info['password'])
class Server(Couchbase):
def __init__(self, host, username, password):
- warnings.warn("Server is deprecated; use Couchbase instead", DeprecationWarning)
+ warnings.warn("Server is deprecated; use Couchbase instead",
+ DeprecationWarning)
Couchbase.__init__(self, host, username, password)
@@ -197,7 +209,11 @@ def __init__(self, bucket_name, server):
self.bucket_password = rest.get_bucket(bucket_name).saslPassword
ip, port, rest_username, rest_password = server._rest_info()
- self.mc_client = VBucketAwareCouchbaseClient("http://%s:%s/pools/default" % (ip, port), self.bucket_name, self.bucket_password)
+ formatter_uri = "http://%s:%s/pools/default"
+ self.mc_client = VBucketAwareCouchbaseClient(formatter_uri %
+ (ip, port),
+ self.bucket_name,
+ self.bucket_password)
def append(self, key, value, cas=0):
return self.mc_client.append(key, value, cas)
@@ -280,7 +296,7 @@ def save(self, document):
rest.create_view(self.bucket_name, view, json.dumps(value))
else:
if '_rev' in value:
- # couchbase works in clobber mode so for a "set" _rev is useless
+ # couchbase works in clobber mode so for "set" _rev is useless
del value['_rev']
self.set(key, expiration, flags, json.dumps(value))
@@ -312,23 +328,26 @@ def view(self, view, **options):
rest = self.server._rest()
- results = rest.view_results(self.bucket_name, view_doc, view_map, params, limit)
+ results = rest.view_results(self.bucket_name, view_doc, view_map,
+ params, limit)
if 'rows' in results:
return results['rows']
else:
return None
+
class ServerHelper(object):
@staticmethod
def parse_server_config(uri, username="", password=""):
urlopener = urllib.FancyURLopener()
if len(username) > 0 and len(password) > 0:
- urlopener.prompt_user_passwd = lambda host, realm: (username, password)
+ urlopener.prompt_user_passwd = lambda: (username, password)
response = urlopener.open(uri)
try:
line = response.readline()
data = json.loads(line)
return data
except:
- raise Exception("unexpected error - unable to parse server config at %s" % (uri))
+ raise Exception("unexpected error - unable to parse server config"
+ " at %s" % (uri))
View
288 couchbase/couchbaseclient.py
@@ -122,7 +122,8 @@ class MemcachedConstants(object):
'pending': VB_STATE_PENDING,
'dead': VB_STATE_DEAD}
- COMMAND_NAMES = dict(((globals()[k], k) for k in globals() if k.startswith("CMD_")))
+ COMMAND_NAMES = (dict(((globals()[k], k) for k in globals()
+ if k.startswith("CMD_"))))
# TAP_OPAQUE types
TAP_OPAQUE_ENABLE_AUTO_NACK = 0
@@ -145,7 +146,7 @@ class MemcachedConstants(object):
# TAP per-message flags
TAP_FLAG_ACK = 0x01
- TAP_FLAG_NO_VALUE = 0x02 # The value for the key is not included in the packet
+ TAP_FLAG_NO_VALUE = 0x02 # The value for key is not included in the packet
# Flags, expiration
SET_PKT_FMT = ">II"
@@ -222,8 +223,9 @@ class MemcachedError(exceptions.Exception):
"""Error raised when a command fails."""
def __init__(self, status, msg):
- supermsg = 'Memcached error #' + `status`
- if msg: supermsg += ": " + msg
+ supermsg = 'Memcached error #' + repr(status)
+ if msg:
+ supermsg += ": " + msg
exceptions.Exception.__init__(self, supermsg)
self.status = status
@@ -259,7 +261,8 @@ def _sendCmd(self, cmd, key, val, opaque, extraHeader='', cas=0):
def _sendMsg(self, cmd, key, val, opaque, extraHeader='', cas=0,
dtype=0, vbucketId=0,
- fmt=MemcachedConstants.REQ_PKT_FMT, magic=MemcachedConstants.REQ_MAGIC_BYTE):
+ fmt=MemcachedConstants.REQ_PKT_FMT,
+ magic=MemcachedConstants.REQ_MAGIC_BYTE):
msg = struct.pack(fmt, magic,
cmd, len(key), len(extraHeader), dtype, vbucketId,
len(key) + len(extraHeader) + len(val), opaque, cas)
@@ -268,24 +271,28 @@ def _sendMsg(self, cmd, key, val, opaque, extraHeader='', cas=0,
def _recvMsg(self):
response = ""
while len(response) < MemcachedConstants.MIN_RECV_PACKET:
- data = self.s.recv(MemcachedConstants.MIN_RECV_PACKET - len(response))
+ data = self.s.recv(MemcachedConstants.MIN_RECV_PACKET
+ - len(response))
if data == '':
- raise exceptions.EOFError("Got empty data (remote died?). from %s" % (self.host))
+ raise exceptions.EOFError("Got empty data (remote died?)."
+ " from %s" % (self.host))
response += data
assert len(response) == MemcachedConstants.MIN_RECV_PACKET
- magic, cmd, keylen, extralen, dtype, errcode, remaining, opaque, cas = \
+ magic, cmd, keylen, extralen, dtype, errcode, remaining, opaque, cas =\
struct.unpack(MemcachedConstants.RES_PKT_FMT, response)
rv = ""
while remaining > 0:
data = self.s.recv(remaining)
if data == '':
- raise exceptions.EOFError("Got empty data (remote died?). from %s" % (self.host))
+ raise exceptions.EOFError("Got empty data (remote died?)."
+ " from %s" % (self.host))
rv += data
remaining -= len(data)
- assert (
- magic in (MemcachedConstants.RES_MAGIC_BYTE, MemcachedConstants.REQ_MAGIC_BYTE)), "Got magic: %d" % magic
+ assert (magic in (MemcachedConstants.RES_MAGIC_BYTE,
+ MemcachedConstants.REQ_MAGIC_BYTE)),\
+ "Got magic: %d" % magic
return cmd, errcode, opaque, cas, keylen, extralen, rv
def _handleKeyedResponse(self, myopaque):
@@ -297,7 +304,8 @@ def _handleKeyedResponse(self, myopaque):
return cmd, opaque, cas, keylen, extralen, rv
def _handleSingleResponse(self, myopaque):
- cmd, opaque, cas, keylen, extralen, data = self._handleKeyedResponse(myopaque)
+ cmd, opaque, cas, keylen, extralen, data =\
+ self._handleKeyedResponse(myopaque)
return opaque, cas, data
def _doCmd(self, cmd, key, val, extraHeader='', cas=0):
@@ -307,8 +315,9 @@ def _doCmd(self, cmd, key, val, extraHeader='', cas=0):
return self._handleSingleResponse(opaque)
def _mutate(self, cmd, key, exp, flags, cas, val):
- return self._doCmd(cmd, key, val, struct.pack(MemcachedConstants.SET_PKT_FMT, flags, exp),
- cas)
+ return self._doCmd(cmd, key, val,
+ struct.pack(MemcachedConstants.SET_PKT_FMT, flags,
+ exp), cas)
def _cat(self, cmd, key, cas, val):
return self._doCmd(cmd, key, val, '', cas)
@@ -322,35 +331,41 @@ def prepend(self, key, value, cas=0, vbucket=-1):
return self._cat(MemcachedConstants.CMD_PREPEND, key, cas, value)
def __incrdecr(self, cmd, key, amt, init, exp):
- something, cas, val = self._doCmd(cmd, key, '',
- struct.pack(MemcachedConstants.INCRDECR_PKT_FMT, amt, init, exp))
+ something, cas, val =\
+ self._doCmd(cmd, key, '',
+ struct.pack(MemcachedConstants.INCRDECR_PKT_FMT,
+ amt, init, exp))
return struct.unpack(MemcachedConstants.INCRDECR_RES_FMT, val)[0], cas
- def incr(self, key, amt=1, init=0, exp=0,vbucket=-1):
+ def incr(self, key, amt=1, init=0, exp=0, vbucket=-1):
"""Increment or create the named counter."""
self._set_vbucket_id(key, vbucket)
- return self.__incrdecr(MemcachedConstants.CMD_INCR, key, amt, init, exp)
+ return self.__incrdecr(MemcachedConstants.CMD_INCR, key, amt, init,
+ exp)
- def decr(self, key, amt=1, init=0, exp=0,vbucket=-1):
+ def decr(self, key, amt=1, init=0, exp=0, vbucket=-1):
"""Decrement or create the named counter."""
self._set_vbucket_id(key, vbucket)
- return self.__incrdecr(MemcachedConstants.CMD_DECR, key, amt, init, exp)
+ return self.__incrdecr(MemcachedConstants.CMD_DECR, key, amt, init,
+ exp)
def _set_vbucket_id(self, key, vbucket):
- if vbucket ==-1:
+ if vbucket == -1:
self.vbucketId = (zlib.crc32(key) >> 16) & (self.vbucket_count - 1)
else:
self.vbucketId = vbucket
def set(self, key, exp, flags, val, vbucket=-1):
"""Set a value in the memcached server."""
self._set_vbucket_id(key, vbucket)
- return self._mutate(MemcachedConstants.CMD_SET, key, exp, flags, 0, val)
+ return self._mutate(MemcachedConstants.CMD_SET, key, exp, flags, 0,
+ val)
def add(self, key, exp, flags, val, vbucket=-1):
"""Add a value in the memcached server iff it doesn't already exist."""
self._set_vbucket_id(key, vbucket)
- return self._mutate(MemcachedConstants.CMD_ADD, key, exp, flags, 0, val)
+ return self._mutate(MemcachedConstants.CMD_ADD, key, exp, flags, 0,
+ val)
def replace(self, key, exp, flags, val, vbucket=-1):
"""Replace a value in the memcached server iff it already exists."""
@@ -389,14 +404,14 @@ def touch(self, key, exp, vbucket=-1):
struct.pack(MemcachedConstants.TOUCH_PKT_FMT, exp))
def gat(self, key, exp, vbucket=-1):
- """Get the value for a given key and touch it within the memcached server."""
+ """Get the value for a given key and touch it."""
self._set_vbucket_id(key, vbucket)
parts = self._doCmd(MemcachedConstants.CMD_GAT, key, '',
struct.pack(MemcachedConstants.GAT_PKT_FMT, exp))
return self.__parseGet(parts)
def version(self):
- """Get the value for a given key within the memcached server."""
+ """Get the version of the server."""
return self._doCmd(MemcachedConstants.CMD_VERSION, '', '')
def sasl_mechanisms(self):
@@ -411,7 +426,8 @@ def sasl_auth_start(self, mech, data):
def sasl_auth_plain(self, user, password, foruser=''):
"""Perform plain auth."""
- return self.sasl_auth_start('PLAIN', '\0'.join([foruser, user, password]))
+ return self.sasl_auth_start('PLAIN', '\0'.join([foruser, user,
+ password]))
def sasl_auth_cram_md5(self, user, password):
"""Start a plan auth session."""
@@ -446,7 +462,8 @@ def start_onlineupdate(self):
return self._doCmd(MemcachedConstants.CMD_START_ONLINEUPDATE, '', '')
def complete_onlineupdate(self):
- return self._doCmd(MemcachedConstants.CMD_COMPLETE_ONLINEUPDATE, '', '')
+ return self._doCmd(MemcachedConstants.CMD_COMPLETE_ONLINEUPDATE, '',
+ '')
def revert_onlineupdate(self):
return self._doCmd(MemcachedConstants.CMD_REVERT_ONLINEUPDATE, '', '')
@@ -505,7 +522,8 @@ def stats(self, sub=''):
done = False
rv = {}
while not done:
- cmd, opaque, cas, klen, extralen, data = self._handleKeyedResponse(None)
+ cmd, opaque, cas, klen, extralen, data =\
+ self._handleKeyedResponse(None)
if klen:
rv[data[0:klen]] = data[klen:]
else:
@@ -524,36 +542,44 @@ def delete(self, key, cas=0, vbucket=-1):
def flush(self, timebomb=0):
"""Flush all storage in a memcached instance."""
return self._doCmd(MemcachedConstants.CMD_FLUSH, '', '',
- struct.pack(MemcachedConstants.FLUSH_PKT_FMT, timebomb))
+ struct.pack(MemcachedConstants.FLUSH_PKT_FMT,
+ timebomb))
def bucket_select(self, name):
return self._doCmd(MemcachedConstants.CMD_SELECT_BUCKET, name, '')
def sync_persistence(self, keyspecs):
payload = self._build_sync_payload(0x8, keyspecs)
- opaque, cas, data = self._doCmd(MemcachedConstants.CMD_SYNC, "", payload)
+ opaque, cas, data = self._doCmd(MemcachedConstants.CMD_SYNC, "",
+ payload)
return opaque, cas, self._parse_sync_response(data)
def sync_mutation(self, keyspecs):
payload = self._build_sync_payload(0x4, keyspecs)
- opaque, cas, data = self._doCmd(MemcachedConstants.CMD_SYNC, "", payload)
+ opaque, cas, data = self._doCmd(MemcachedConstants.CMD_SYNC, "",
+ payload)
return opaque, cas, self._parse_sync_response(data)
def sync_replication(self, numReplicas, keyspecs):
payload = self._build_sync_payload((numReplicas & 0x0f) << 4, keyspecs)
- opaque, cas, data = self._doCmd(MemcachedConstants.CMD_SYNC, "", payload)
+ opaque, cas, data = self._doCmd(MemcachedConstants.CMD_SYNC, "",
+ payload)
return opaque, cas, self._parse_sync_response(data)
def sync_replication_or_persistence(self, numReplicas, keyspecs):
- payload = self._build_sync_payload(((numReplicas & 0x0f) << 4) | 0x8, keyspecs)
+ payload = self._build_sync_payload(((numReplicas & 0x0f) << 4) |
+ 0x8, keyspecs)
- opaque, cas, data = self._doCmd(MemcachedConstants.CMD_SYNC, "", payload)
+ opaque, cas, data = self._doCmd(MemcachedConstants.CMD_SYNC, "",
+ payload)
return opaque, cas, self._parse_sync_response(data)
def sync_replication_and_persistence(self, numReplicas, keyspecs):
- payload = self._build_sync_payload(((numReplicas & 0x0f) << 4) | 0xA, keyspecs)
+ payload = self._build_sync_payload(((numReplicas & 0x0f) << 4) |
+ 0xA, keyspecs)
- opaque, cas, data = self._doCmd(MemcachedConstants.CMD_SYNC, "", payload)
+ opaque, cas, data = self._doCmd(MemcachedConstants.CMD_SYNC, "",
+ payload)
return opaque, cas, self._parse_sync_response(data)
def _build_sync_payload(self, flags, keyspecs):
@@ -563,9 +589,9 @@ def _build_sync_payload(self, flags, keyspecs):
for spec in keyspecs:
if not isinstance(spec, dict):
raise TypeError("each keyspec must be a dict")
- if not spec.has_key('vbucket'):
+ if 'vbucket' not in spec:
raise TypeError("missing vbucket property in keyspec")
- if not spec.has_key('key'):
+ if 'key' not in spec:
raise TypeError("missing key property in keyspec")
payload += struct.pack(">Q", spec.get('cas', 0))
@@ -582,7 +608,8 @@ def _parse_sync_response(self, data):
for i in xrange(nkeys):
spec = {}
width = struct.calcsize("QHHB")
- spec['cas'], spec['vbucket'], keylen, eventid = struct.unpack(">QHHB", data[offset: offset + width])
+ spec['cas'], spec['vbucket'], keylen, eventid =\
+ struct.unpack(">QHHB", data[offset: offset + width])
offset += width
spec['key'] = data[offset: offset + keylen]
@@ -616,7 +643,8 @@ def restore_complete(self):
def deregister_tap_client(self, tap_name):
"""Deregister the TAP client with a given name."""
- return self._doCmd(MemcachedConstants.CMD_DEREGISTER_TAP_CLIENT, tap_name, '')
+ return self._doCmd(MemcachedConstants.CMD_DEREGISTER_TAP_CLIENT,
+ tap_name, '')
class VBucketAwareCouchbaseClient(object):
@@ -635,11 +663,15 @@ def __init__(self, url, bucket, password="", verbose=False):
#TODO: use regular expressions to parse the url
server = {}
if not bucket:
- raise InvalidArgumentException("bucket can not be an empty string", parameters="bucket")
+ raise InvalidArgumentException("bucket can not be an empty string",
+ parameters="bucket")
if not url:
- raise InvalidArgumentException("url can not be an empty string", parameters="url")
- if url.find("http://") != -1 and url.rfind(":") != -1 and url.find("/pools/default") != -1:
- server["ip"] = url[url.find("http://") + len("http://"):url.rfind(":")]
+ raise InvalidArgumentException("url can not be an empty string",
+ parameters="url")
+ if (url.find("http://") != -1 and url.rfind(":") != -1 and
+ url.find("/pools/default") != -1):
+ server["ip"] = (url[url.find("http://")
+ + len("http://"):url.rfind(":")])
server["port"] = url[url.rfind(":") + 1:url.find("/pools/default")]
server["username"] = self.rest_username
server["password"] = self.rest_password
@@ -649,10 +681,12 @@ def __init__(self, url, bucket, password="", verbose=False):
self.reconfig_vbucket_map()
self.init_vbucket_connections()
self.dispatcher = CommandDispatcher(self)
- self.dispatcher_thread = Thread(name="dispatcher-thread", target=self._start_dispatcher)
+ self.dispatcher_thread = Thread(name="dispatcher-thread",
+ target=self._start_dispatcher)
self.dispatcher_thread.daemon = True
self.dispatcher_thread.start()
- self.streaming_thread = Thread(name="streaming", target=self._start_streaming, args=())
+ self.streaming_thread = Thread(name="streaming",
+ target=self._start_streaming, args=())
self.streaming_thread.daemon = True
self.streaming_thread.start()
self.verbose = verbose
@@ -661,16 +695,20 @@ def _start_dispatcher(self):
self.dispatcher.dispatch()
def _start_streaming(self):
- # this will dynamically update vBucketMap, vBucketMapFastForward, servers
+ # This will dynamically update vBucketMap, vBucketMapFastForward,
+ # and servers
urlopener = urllib.FancyURLopener()
- urlopener.prompt_user_passwd = lambda host, realm: (self.rest_username, self.rest_password)
+ urlopener.prompt_user_passwd = lambda: (self.rest_username,
+ self.rest_password)
current_servers = True
while current_servers:
self.servers_lock.acquire()
current_servers = deepcopy(self.servers)
self.servers_lock.release()
for server in current_servers:
- response = urlopener.open("http://%s:%s/pools/default/bucketsStreaming/%s" % (server["ip"], server["port"], self.bucket))
+ response = urlopener.open("http://%s:%s/pools/default/bucketsS"
+ "treaming/%s" % (server["ip"],
+ server["port"], self.bucket))
while response:
try:
line = response.readline()
@@ -691,11 +729,14 @@ def _start_streaming(self):
vbucketmapfastforward = {}
index = 0
if 'vBucketMapForward' in data['vBucketServerMap']:
- for vbucket in data['vBucketServerMap']['vBucketMapForward']:
- vbucketmapfastforward[index] = serverlist[vbucket[0]]
+ for vbucket in\
+ data['vBucketServerMap']['vBucketMapForward']:
+ vbucketmapfastforward[index] =\
+ serverlist[vbucket[0]]
index += 1
self._vBucketMapFastForward_lock.acquire()
- self._vBucketMapFastForward = deepcopy(vbucketmapfastforward)
+ self._vBucketMapFastForward =\
+ deepcopy(vbucketmapfastforward)
self._vBucketMapFastForward_lock.release()
vbucketmap = {}
index = 0
@@ -714,12 +755,14 @@ def _start_streaming(self):
new_servers = []
nodes = data["nodes"]
for node in nodes:
- if node["clusterMembership"] == "active" and node["status"] == "healthy":
- hostport = node["hostname"]
- new_servers.append({"ip":hostport.split(":")[0],
- "port":int(hostport.split(":")[1]),
- "username":self.rest_username,
- "password":self.rest_password})
+ if (node["clusterMembership"] == "active" and
+ node["status"] == "healthy"):
+ ip, port = node["hostname"].split(":")
+ new_servers.append({"ip": ip,
+ "port": port,
+ "username": self.rest_username,
+ "password": self.rest_password
+ })
new_servers.sort()
self.servers_lock.acquire()
self.servers = deepcopy(new_servers)
@@ -739,7 +782,9 @@ def start_vbucket_connection(self, vbucket):
self._vBucketMap_lock.release()
serverIp, serverPort = server.split(":")
if not server in self._memcacheds:
- self._memcacheds[server] = MemcachedClientHelper.direct_client(self.rest, serverIp, serverPort, self.bucket)
+ self._memcacheds[server] =\
+ MemcachedClientHelper.direct_client(self.rest, serverIp,
+ serverPort, self.bucket)
def start_vbucket_fastforward_connection(self, vbucket):
self._vBucketMapFastForward_lock.acquire()
@@ -750,7 +795,9 @@ def start_vbucket_fastforward_connection(self, vbucket):
self._vBucketMapFastForward_lock.release()
serverIp, serverPort = server.split(":")
if not server in self._memcacheds:
- self._memcacheds[server] = MemcachedClientHelper.direct_client(self.rest, serverIp, serverPort, self.bucket)
+ self._memcacheds[server] =\
+ MemcachedClientHelper.direct_client(self.rest, serverIp,
+ serverPort, self.bucket)
def restart_vbucket_connection(self, vbucket):
self._vBucketMap_lock.acquire()
@@ -759,16 +806,17 @@ def restart_vbucket_connection(self, vbucket):
serverIp, serverPort = server.split(":")
if server in self._memcacheds:
self._memcacheds[server].close()
- self._memcacheds[server] = MemcachedClientHelper.direct_client(self.rest, serverIp, serverPort, self.bucket)
+ self._memcacheds[server] =\
+ MemcachedClientHelper.direct_client(self.rest, serverIp,
+ serverPort, self.bucket)
def reconfig_vbucket_map(self, vbucket=-1):
vb_ready = RestHelper(self.rest).vbucket_map_ready(self.bucket, 60)
if not vb_ready:
- raise Exception("vbucket map is not ready for bucket %s" % (self.bucket))
+ raise Exception("vbucket map is not ready for bucket %s" %
+ (self.bucket))
vBuckets = self.rest.get_vbuckets(self.bucket)
self.vbucket_count = len(vBuckets)
- bucket_info = self.rest.get_bucket(self.bucket)
- nodes = bucket_info.nodes
self._vBucketMap_lock.acquire()
for vBucket in vBuckets:
@@ -785,7 +833,8 @@ def memcached(self, key, fastforward=False):
# only try the fastforward if we have an entry
# otherwise we just wait for the main map to update
self.start_vbucket_fastforward_connection(vBucketId)
- self._vBucketMap[vBucketId] = self._vBucketMapFastForward[vBucketId]
+ self._vBucketMap[vBucketId] =\
+ self._vBucketMapFastForward[vBucketId]
if vBucketId not in self._vBucketMap:
msg = "vbucket map does not have an entry for vb : %s"
@@ -838,50 +887,52 @@ def get(self, key):
def gat(self, key, expiry):
event = Event()
- item = {"operation": "gat", "key": key, "expiry": expiry, "event": event,
- "response": {}}
+ item = {"operation": "gat", "key": key, "expiry": expiry,
+ "event": event, "response": {}}
self.dispatcher.put(item)
return self._respond(item, event)
def touch(self, key, expiry):
event = Event()
- item = {"operation": "touch", "key": key, "expiry": expiry, "event": event,
+ item = {"operation": "touch", "key": key, "expiry": expiry,
+ "event": event,
"response": {}}
self.dispatcher.put(item)
return self._respond(item, event)
def cas(self, key, expiry, flags, old_value, value):
event = Event()
- item = {"operation": "cas", "key": key, "expiry": expiry, "flags": flags, "old_value": old_value, "value": value
- , "event": event, "response": {}}
+ item = {"operation": "cas", "key": key, "expiry": expiry,
+ "flags": flags, "old_value": old_value, "value": value,
+ "event": event, "response": {}}
self.dispatcher.put(item)
return self._respond(item, event)
def decr(self, key, amount=1, init=0, expiry=0):
event = Event()
- item = {"operation": "decr", "key": key, "amount": amount, "init": init, "expiry": expiry, "event": event,
- "response": {}}
+ item = {"operation": "decr", "key": key, "amount": amount,
+ "init": init, "expiry": expiry, "event": event, "response": {}}
self.dispatcher.put(item)
return self._respond(item, event)
def set(self, key, expiry, flags, value):
event = Event()
- item = {"operation": "set", "key": key, "expiry": expiry, "flags": flags, "value": value, "event": event,
- "response": {}}
+ item = {"operation": "set", "key": key, "expiry": expiry,
+ "flags": flags, "value": value, "event": event, "response": {}}
self.dispatcher.put(item)
return self._respond(item, event)
def add(self, key, expiry, flags, value):
event = Event()
- item = {"operation": "add", "key": key, "expiry": expiry, "flags": flags, "value": value, "event": event,
- "response": {}}
+ item = {"operation": "add", "key": key, "expiry": expiry,
+ "flags": flags, "value": value, "event": event, "response": {}}
self.dispatcher.put(item)
return self._respond(item, event)
def append(self, key, value, cas=0):
event = Event()
- item = {"operation": "append", "key": key, "cas": cas, "value": value, "event": event,
- "response": {}}
+ item = {"operation": "append", "key": key, "cas": cas, "value": value,
+ "event": event, "response": {}}
self.dispatcher.put(item)
return self._respond(item, event)
@@ -894,29 +945,29 @@ def delete(self, key, cas=0):
def prepend(self, key, value, cas=0):
event = Event()
- item = {"operation": "prepend", "key": key, "cas": cas, "value": value, "event": event,
- "response": {}}
+ item = {"operation": "prepend", "key": key, "cas": cas, "value": value,
+ "event": event, "response": {}}
self.dispatcher.put(item)
return self._respond(item, event)
def getl(self, key, expiry=15):
event = Event()
- item = {"operation": "getl", "key": key, "expiry": expiry, "event": event,
- "response": {}}
+ item = {"operation": "getl", "key": key, "expiry": expiry,
+ "event": event, "response": {}}
self.dispatcher.put(item)
return self._respond(item, event)
def replace(self, key, expiry, flags, value):
event = Event()
- item = {"operation": "replace", "key": key, "expiry": expiry, "flags": flags, "value": value, "event": event,
- "response": {}}
+ item = {"operation": "replace", "key": key, "expiry": expiry,
+ "flags": flags, "value": value, "event": event, "response": {}}
self.dispatcher.put(item)
return self._respond(item, event)
def incr(self, key, amount=1, init=0, expiry=0):
event = Event()
- item = {"operation": "incr", "key": key, "amount": amount, "init": init, "expiry": expiry, "event": event,
- "response": {}}
+ item = {"operation": "incr", "key": key, "amount": amount,
+ "init": init, "expiry": expiry, "event": event, "response": {}}
self.dispatcher.put(item)
return self._respond(item, event)
@@ -932,7 +983,8 @@ def __init__(self, vbaware, verbose=False):
self.vbaware = vbaware
self.reconfig_callback = self.vbaware.reconfig_vbucket_map
self.start_connection_callback = self.vbaware.start_vbucket_connection
- self.restart_connection_callback = self.vbaware.restart_vbucket_connection
+ self.restart_connection_callback =\
+ self.vbaware.restart_vbucket_connection
self.verbose = verbose
self.log = logger.logger("CommandDispatcher")
self._dispatcher_stopped_event = Event()
@@ -955,7 +1007,8 @@ def reconfig_completed(self):
self.status = "ok"
def dispatch(self):
- while self.status != "shutdown" or (self.status == "shutdown" and self.queue.qsize() > 0):
+ while (self.status != "shutdown" or (self.status == "shutdown" and
+ self.queue.qsize() > 0)):
#wait if its reconfiguring the vbucket-map
if self.status == "vbucketmap-configuration":
continue
@@ -995,7 +1048,8 @@ def _raise_if_recoverable(self, ex, item):
if isinstance(ex, MemcachedError) and ex.status == 7:
ex.vbucket = item["vbucket"]
print ex
- self.log.error("got not my vb error. key: %s, vbucket: %s" % (item["key"], item["vbucket"]))
+ self.log.error("got not my vb error. key: %s, vbucket: %s" %
+ (item["key"], item["vbucket"]))
raise ex
if isinstance(ex, exceptions.EOFError):
ex.vbucket = item["vbucket"]
@@ -1010,7 +1064,7 @@ def _raise_if_recoverable(self, ex, item):
item["response"]["error"] = ex
def do(self, item):
- #find which vbucket this belongs to and then run the operation on that ?
+ #find which vbucket this belongs to and then run the operation on that
if "key" in item:
item["vbucket"] = self.vbaware.vbucketid(item["key"])
if not "fastforward" in item:
@@ -1020,7 +1074,8 @@ def do(self, item):
if item["operation"] == "get":
key = item["key"]
try:
- item["response"]["return"] = self.vbaware.memcached(key, item["fastforward"]).get(key)
+ item["response"]["return"] =\
+ self.vbaware.memcached(key, item["fastforward"]).get(key)
except Exception, ex:
self._raise_if_recoverable(ex, item)
item["event"].set()
@@ -1030,7 +1085,9 @@ def do(self, item):
flags = item["flags"]
value = item["value"]
try:
- item["response"]["return"] = self.vbaware.memcached(key, item["fastforward"]).set(key, expiry, flags, value)
+ conn = self.vbaware.memcached(key, item["fastforward"])
+ item["response"]["return"] = conn.set(key, expiry, flags,
+ value)
except Exception, ex:
self._raise_if_recoverable(ex, item)
item["event"].set()
@@ -1040,7 +1097,9 @@ def do(self, item):
flags = item["flags"]
value = item["value"]
try:
- item["response"]["return"] = self.vbaware.memcached(key, item["fastforward"]).add(key, expiry, flags, value)
+ conn = self.vbaware.memcached(key, item["fastforward"])
+ item["response"]["return"] = conn.add(key, expiry, flags,
+ value)
except Exception, ex:
self._raise_if_recoverable(ex, item)
item["event"].set()
@@ -1050,15 +1109,18 @@ def do(self, item):
flags = item["flags"]
value = item["value"]
try:
- item["response"]["return"] = self.vbaware.memcached(key, item["fastforward"]).replace(key, expiry, flags, value)
+ conn = self.vbaware.memcached(key, item["fastforward"])
+ item["response"]["return"] = conn.replace(key, expiry, flags,
+ value)
except Exception, ex:
self._raise_if_recoverable(ex, item)
item["event"].set()
elif item["operation"] == "delete":
key = item["key"]
cas = item["cas"]
try:
- item["response"]["return"] = self.vbaware.memcached(key, item["fastforward"]).delete(key, cas)
+ conn = self.vbaware.memcached(key, item["fastforward"])
+ item["response"]["return"] = conn.delete(key, cas)
except Exception, ex:
self._raise_if_recoverable(ex, item)
item["event"].set()
@@ -1067,7 +1129,8 @@ def do(self, item):
cas = item["cas"]
value = item["value"]
try:
- item["response"]["return"] = self.vbaware.memcached(key, item["fastforward"]).prepend(key, value, cas)
+ conn = self.vbaware.memcached(key, item["fastforward"])
+ item["response"]["return"] = conn.prepend(key, value, cas)
except Exception, ex:
self._raise_if_recoverable(ex, item)
item["event"].set()
@@ -1076,31 +1139,35 @@ def do(self, item):
cas = item["cas"]
value = item["value"]
try:
- item["response"]["return"] = self.vbaware.memcached(key, item["fastforward"]).append(key, value, cas)
+ conn = self.vbaware.memcached(key, item["fastforward"])
+ item["response"]["return"] = conn.append(key, value, cas)
except Exception, ex:
self._raise_if_recoverable(ex, item)
item["event"].set()
elif item["operation"] == "getl":
key = item["key"]
expiry = item["expiry"]
try:
- item["response"]["return"] = self.vbaware.memcached(key, item["fastforward"]).getl(key, expiry)
+ conn = self.vbaware.memcached(key, item["fastforward"])
+ item["response"]["return"] = conn.getl(key, expiry)
except Exception, ex:
self._raise_if_recoverable(ex, item)
item["event"].set()
elif item["operation"] == "gat":
key = item["key"]
expiry = item["expiry"]
try:
- item["response"]["return"] = self.vbaware.memcached(key, item["fastforward"]).gat(key, expiry)
+ conn = self.vbaware.memcached(key, item["fastforward"])
+ item["response"]["return"] = conn.gat(key, expiry)
except Exception, ex:
self._raise_if_recoverable(ex, item)
item["event"].set()
elif item["operation"] == "touch":
key = item["key"]
expiry = item["expiry"]
try:
- item["response"]["return"] = self.vbaware.memcached(key, item["fastforward"]).touch(key, expiry)
+ conn = self.vbaware.memcached(key, item["fastforward"])
+ item["response"]["return"] = conn.touch(key, expiry)
except Exception, ex:
self._raise_if_recoverable(ex, item)
item["event"].set()
@@ -1110,7 +1177,9 @@ def do(self, item):
init = item["init"]
expiry = item["expiry"]
try:
- item["response"]["return"] = self.vbaware.memcached(key, item["fastforward"]).incr(key, amount, init, expiry)
+ conn = self.vbaware.memcached(key, item["fastforward"])
+ item["response"]["return"] = conn.incr(key, amount, init,
+ expiry)
except Exception, ex:
self._raise_if_recoverable(ex, item)
item["event"].set()
@@ -1120,7 +1189,9 @@ def do(self, item):
init = item["init"]
expiry = item["expiry"]
try:
- item["response"]["return"] = self.vbaware.memcached(key, item["fastforward"]).decr(key, amount, init, expiry)
+ conn = self.vbaware.memcached(key, item["fastforward"])
+ item["response"]["return"] = conn.decr(key, amount, init,
+ expiry)
except Exception, ex:
self._raise_if_recoverable(ex, item)
item["event"].set()
@@ -1132,7 +1203,9 @@ def do(self, item):
old_value = item["old_value"]
value = item["value"]
try:
- item["response"]["return"] = self.vbaware.memcached(key, item["fastforward"]).cas(key, expiry, flags, old_value, value)
+ conn = self.vbaware.memcached(key, item["fastforward"])
+ item["response"]["return"] = conn.cas(key, expiry, flags,
+ old_value, value)
except Exception, ex:
self._raise_if_recoverable(ex, item)
item["event"].set()
@@ -1145,9 +1218,12 @@ def direct_client(rest, ip, port, bucket):
vBuckets = bucket_info.vbuckets
for node in bucket_info.nodes:
if node.ip == ip and node.memcached == int(port):
- client = MemcachedClient(ip.encode('ascii', 'ignore'), int(port))
+ client = MemcachedClient(ip.encode('ascii', 'ignore'),
+ int(port))
client.vbucket_count = len(vBuckets)
client.sasl_auth_plain(bucket_info.name.encode('ascii'),
- bucket_info.saslPassword.encode('ascii'))
+ bucket_info.saslPassword
+ .encode('ascii'))
return client
- raise Exception("unexpected error - unable to find ip:%s in this cluster" % (ip))
+ raise Exception(("unexpected error - unable to find ip:%s in this"
+ " cluster" % (ip)))
View
18 couchbase/exception.py
@@ -29,7 +29,8 @@ class CouchbaseHttpExceptionTypes(object):
class MemcachedTimeoutException(Exception):
def __init__(self, item, timeout):
- msg = "timeout - memcached did not return in %s second during %s operation for key %s"
+ msg = ("timeout - memcached did not return in %s second during %s"
+ " operation for key %s")
self._message = msg % (timeout, item["operation"], item["key"])
def __str__(self):
@@ -70,7 +71,8 @@ def __init__(self, ip='', bucket_name='', error=''):
self.parameters['host'] = ip
self.parameters['bucket'] = bucket_name
self.type = CouchbaseHttpExceptionTypes.BUCKET_CREATION_ERROR
- self._message = 'unable to create bucket %s on the host @ %s' % (bucket_name, ip)
+ self._message = ('unable to create bucket %s on the host @ %s' %
+ (bucket_name, ip))
if error:
self._message += ' due to error: ' + error
@@ -81,7 +83,8 @@ def __init__(self, ip='', bucket_name='', error=''):
self.parameters['host'] = ip
self.parameters['bucket'] = bucket_name
self.type = CouchbaseHttpExceptionTypes.BUCKET_UNAVAILABLE
- self._message = 'unable to find bucket %s on the host @ %s' % (bucket_name, ip)
+ self._message = ('unable to find bucket %s on the host @ %s' %
+ (bucket_name, ip))
if error:
self._message += ' due to error: ' + error
@@ -104,12 +107,14 @@ class InvalidArgumentException(CouchbaseHttpException):
def __init__(self, api, parameters):
self.parameters = parameters
self.api = api
- self._message = '%s failed when invoked with parameters: %s' % (self.api, self.parameters)
+ self._message = ('%s failed when invoked with parameters: %s' %
+ (self.api, self.parameters))
class ServerJoinException(CouchbaseHttpException):
def __init__(self, nodeIp='', remoteIp=''):
- self._message = 'node: %s already added to this cluster:%s' % (remoteIp, nodeIp)
+ self._message = ('node: %s already added to this cluster:%s' %
+ (remoteIp, nodeIp))
self.parameters = dict()
self.parameters['nodeIp'] = nodeIp
self.parameters['remoteIp'] = remoteIp
@@ -118,7 +123,8 @@ def __init__(self, nodeIp='', remoteIp=''):
class ServerAlreadyJoinedException(CouchbaseHttpException):
def __init__(self, nodeIp='', remoteIp=''):
- self._message = 'node: %s already added to this cluster:%s' % (remoteIp, nodeIp)
+ self._message = ('node: %s already added to this cluster:%s' %
+ (remoteIp, nodeIp))
self.parameters = dict()
self.parameters['nodeIp'] = nodeIp
self.parameters['remoteIp'] = remoteIp
View
4 couchbase/logger.py
@@ -17,13 +17,15 @@
import logging
+
def logger(name):
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
consoleHandler = logging.StreamHandler()
consoleHandler.setLevel(logging.DEBUG)
# create formatter
- formatter = logging.Formatter("[%(asctime)s] - [%(module)s] [%(thread)d] - %(levelname)s - %(message)s")
+ formatter = logging.Formatter("[%(asctime)s] - [%(module)s] [%(thread)d]"
+ " - %(levelname)s - %(message)s")
# add formatter to ch
consoleHandler.setFormatter(formatter)
# add ch to logger
View
2 couchbase/migrator/__init__.py
@@ -26,6 +26,7 @@
destinations.extend(migrator_dir.destinations)
destinations.extend(migrator_zip.destinations)
+
def reader(loc):
kind, fp = loc.split(':', 1)
if kind.lower() == 'csv':
@@ -41,6 +42,7 @@ def reader(loc):
elif kind.lower() == 'zip':
return ZipReader(fp)
+
def writer(loc):
kind, fp = loc.split(':', 1)
if kind.lower() == 'csv':
View
4 couchbase/migrator/migrator.py
@@ -15,6 +15,7 @@
# limitations under the License.
#
+
class Reader(object):
def __init__(self, fp):
pass
@@ -28,11 +29,12 @@ def next(self):
def close(self):
pass
+
class Writer(object):
def __init__(self, fp):
pass
- def write(self):
+ def write(self, record):
raise NotImplementedError
def close(self):
View
35 couchbase/migrator/migrator_couchbase.py
@@ -15,19 +15,17 @@
# limitations under the License.
#
-sources = [{'type':'couchbase', 'class':'CouchbaseReader', 'example':'couchbase://bucket:password@example.com:8091/bucket'}]
-destinations = [{'type':'couchbase', 'class':'CouchbaseWriter', 'example':'couchbase://bucket:password@example.com:8091/bucket'}]
+sources = [{'type':'couchbase', 'class':'CouchbaseReader',
+ 'example':'couchbase://bucket:password@example.com:8091/bucket'}]
+destinations = [{'type':'couchbase', 'class':'CouchbaseWriter',\
+ 'example': 'couchbase://bucket:password@example.com:8091/bucket'}]
-import re
-import json
-import uuid
from urlparse import urlparse
import urllib
-
import couchbase
-
import migrator
+
class CouchbaseReader(migrator.Reader):
def __init__(self, source):
# couchbase://username:password@example.com:8091/bucket
@@ -45,7 +43,9 @@ def __init__(self, source):
password=self.password)
self.bucket = cb[self.bucket_name]
- self.items = self.bucket.view('_all_docs', limit=self.page_limit + 1, stale=False, reduce=False, include_docs=True)
+ self.items = self.bucket.view('_all_docs', limit=self.page_limit + 1,
+ stale=False, reduce=False,
+ include_docs=True)
def __iter__(self):
return self
@@ -54,15 +54,24 @@ def next(self):
if len(self.items) < 1:
raise StopIteration()
elif len(self.items) == 1:
- next_startkey = urllib.quote_plus(self.items[0]['key'].replace('"', '\\"').encode('utf-8'))
- next_startkey_docid = urllib.quote_plus(self.items[0]['doc']['_id'].replace('"', '\\"').encode('utf-8'))
- self.items = self.bucket.view('_all_docs', limit=self.page_limit + 1, startkey=next_startkey, startkey_docid=next_startkey_docid, stale=False, reduce=False, include_docs=True)
+ next_startkey = (urllib.quote_plus(self.items[0]['key']
+ .replace('"', '\\"').encode('utf-8')))
+ next_startkey_docid = urllib.quote_plus(self.items[0]['doc']['_id']
+ .replace('"', '\\"')
+ .encode('utf-8'))
+ self.items = self.bucket.view('_all_docs',
+ limit=self.page_limit + 1,
+ startkey=next_startkey,
+ startkey_docid=next_startkey_docid,
+ stale=False, reduce=False,
+ include_docs=True)
data = self.items.pop(0)
else:
data = self.items.pop(0)
- record = {'id':data['doc']['_id']}
- record['value'] = dict((k, v) for (k, v) in data['doc'].iteritems() if not k.startswith('$'))
+ record = {'id': data['doc']['_id']}
+ record['value'] = (dict((k, v) for (k, v) in data['doc'].iteritems()
+ if not k.startswith('$')))
return record
View
28 couchbase/migrator/migrator_couchdb.py
@@ -15,12 +15,12 @@
# limitations under the License.
#
-sources = [{'type':'couchdb', 'class':'CouchdbReader', 'example':'couchdb://example.com:5984/database'}]
-destinations = [{'type':'couchdb', 'class':'CouchdbWriter', 'example':'couchdb://example.com:5984/database'}]
+sources = [{'type':'couchdb', 'class':'CouchdbReader',
+ 'example':'couchdb://example.com:5984/database'}]
+destinations = [{'type':'couchdb', 'class':'CouchdbWriter',
+ 'example':'couchdb://example.com:5984/database'}]
-import re
-import json
-import urllib
+import migrator
from urlparse import urlparse
try:
@@ -29,7 +29,6 @@
sources = []
destinations = []
-import migrator
class CouchdbReader(migrator.Reader):
def __init__(self, source):
@@ -44,7 +43,8 @@ def __init__(self, source):
self.couch = couchdb.Server('http://%s:%s' % (self.host, self.port))
self.db = self.couch[self.database]
- self.items = list(self.db.view('_all_docs', limit=self.page_limit + 1, include_docs=True))
+ self.items = list(self.db.view('_all_docs', limit=self.page_limit + 1,
+ include_docs=True))
def __iter__(self):
return self
@@ -53,13 +53,19 @@ def next(self):
if len(self.items) < 1:
raise StopIteration()
elif len(self.items) == 1:
- next_startkey = self.items[0]['key'].replace('"', '\\"').encode('utf-8')
- next_startkey_docid = self.items[0]['key'].replace('"', '\\"').encode('utf-8')
- self.items = list(self.db.view('_all_docs', limit=self.page_limit + 1, startkey=next_startkey, startkey_docid=next_startkey_docid, include_docs=True))
+ next_startkey = (self.items[0]['key'].replace('"', '\\"')
+ .encode('utf-8'))
+ next_startkey_docid = (self.items[0]['key'].replace('"', '\\"')
+ .encode('utf-8'))
+ self.items = list(self.db.view('_all_docs',
+ limit=self.page_limit + 1,
+ startkey=next_startkey,
+ startkey_docid=next_startkey_docid,
+ include_docs=True))
data = self.items.pop(0)
- record = {'id':data['doc']['_id']}
+ record = {'id': data['doc']['_id']}
record['value'] = dict((k, v) for (k, v) in data['doc'].iteritems())
return record
View
7 couchbase/migrator/migrator_csv.py
@@ -19,9 +19,9 @@
destinations = []
import csv
-
import migrator
+
class CSVReader(migrator.Reader):
def __init__(self, source):
if source[0:2] == "//":
@@ -35,8 +35,9 @@ def __iter__(self):
def next(self):
data = self.reader.next()
if data:
- record = {'id':data['id']}
- record['value'] = dict((k, v) for (k, v) in data.iteritems() if k != 'id' and not k.startswith('_'))
+ record = {'id': data['id']}
+ record['value'] = (dict((k, v) for (k, v) in data.iteritems()
+ if k != 'id' and not k.startswith('_')))
return record
else:
raise StopIteration()
View
31 couchbase/migrator/migrator_dir.py
@@ -18,22 +18,24 @@
# source *.json files in a directory into destination
# source:
-# recurses through subdirectories and reads .json files
-# if the .json has an _id field it will use that, otherwise it will use the filename
-# if the directory contains an _id file, the directory itself will be considered a document and all files (except *.json) will be considered json data
+# recurses through subdirectories and reads .json files. If the .json has an
+# _id field it will use that, otherwise it will use the filename. if the
+# directory contains an _id file, the directory itself will be considered a
+# document and all files (except *.json) will be considered json data
# destination:
# writes out items that have _id: _design/* to <dir>/design_docs
# writes out all other items to <dir>/docs
sources = [{'type':'dir', 'class':'DirReader', 'example':'dir://<directory>'}]
-destinations = [{'type':'dir', 'class':'DirWriter', 'example':'dir://<directory>'}]
+destinations = [{'type':'dir', 'class':'DirWriter',
+ 'example':'dir://<directory>'}]
import os
import json
-
import migrator
+
class DirReader(migrator.Reader):
def __init__(self, source):
if source[0:2] == "//":
@@ -65,7 +67,6 @@ def _get_nonjson_filenames(self, base):
def next(self):
try:
filename = self.files.pop(0)
- path = os.path.join(self.dir, filename)
except IndexError:
raise StopIteration()
@@ -78,10 +79,12 @@ def next(self):
json_data = {}
for f in self._get_nonjson_filenames(os.path.dirname(filename)):
f_relative_path = f[len(doc_basepath) + 1:]
- if os.path.isfile(os.path.join(doc_basepath, f_relative_path.split(os.path.sep)[0])):
+ if os.path.isfile(os.path.join(doc_basepath,
+ f_relative_path.split(os.path.sep)[0])):
# its a plain file
- with open (f) as item:
- json_data[f_relative_path.split(os.path.sep)[0]] = item.read().strip('\n\r,')
+ with open(f) as item:
+ json_data[f_relative_path.split(os.path.sep)[0]] =\
+ item.read().strip('\n\r,')
else:
# its a subdirectory
json_cur = json_data
@@ -90,7 +93,8 @@ def next(self):
json_cur[index] = {}
json_cur = json_cur[index]
- index = os.path.splitext(f_relative_path.split(os.path.sep)[-1])[0]
+ index = os.path.splitext(f_relative_path
+ .split(os.path.sep)[-1])[0]
with open(f) as item:
json_cur[index] = item.read().strip('\n\r,')
else:
@@ -101,8 +105,9 @@ def next(self):
else:
id = os.path.splitext(os.path.basename(filename))[0]
- record = {'id':id}
- record['value'] = dict((k, v) for (k, v) in json_data.iteritems() if not k.startswith('_'))
+ record = {'id': id}
+ record['value'] = (dict((k, v) for (k, v) in json_data.iteritems()
+ if not k.startswith('_')))
return record
@@ -114,7 +119,7 @@ def __init__(self, destination):
try:
os.makedirs(os.path.join(destination, "docs"))
os.makedirs(os.path.join(destination, "design_docs"))
- except OSError as e:
+ except OSError:
pass
def write(self, record):
View
17 couchbase/migrator/migrator_json.py
@@ -14,14 +14,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
-sources = [{'type':'json', 'class':'JSONReader', 'example':'json://<filename>'}]
-destinations = [{'type':'json', 'class':'JSONWriter', 'example':'json://<filename>'}]
-
import json
-
import migrator
+sources = [{'type':'json', 'class':'JSONReader',
+ 'example':'json://<filename>'}]
+destinations = [{'type':'json', 'class':'JSONWriter',
+ 'example':'json://<filename>'}]
+
+
class JSONReader(migrator.Reader):
def __init__(self, source):
if source[0:2] == "//":
@@ -38,8 +39,10 @@ def next(self):
json_data = json.loads(data.strip('\n\r,'))
except ValueError:
raise StopIteration()
- record = {'id':json_data['id']}
- record['value'] = dict((k, v) for (k, v) in json_data['value'].iteritems() if not k.startswith('_'))
+ record = {'id': json_data['id']}
+ record['value'] = (dict((k, v) for (k, v) in
+ json_data['value'].iteritems()
+ if not k.startswith('_')))
return record
else:
raise StopIteration()
View
17 couchbase/migrator/migrator_zip.py
@@ -17,18 +17,19 @@
# source *.json files from a zip file into destination
-sources = [{'type':'zip', 'class':'ZipReader', 'example':'zip://<zipfile>'}]
-destinations = [{'type':'zip', 'class':'ZipWriter', 'example':'zip://<zipfile>'}]
-
import os
import json
import zipfile
import tempfile
import shutil
-import migrator
from migrator_dir import DirReader, DirWriter
+sources = [{'type':'zip', 'class':'ZipReader', 'example':'zip://<zipfile>'}]
+destinations = [{'type':'zip', 'class':'ZipWriter',
+ 'example':'zip://<zipfile>'}]
+
+
class ZipReader(DirReader):
def __init__(self, source):
if source[0:2] == "//":
@@ -41,14 +42,16 @@ def __init__(self, source):
DirReader.__init__(self, self.tempdir)
def close(self):
- shutil.rmtree(self.tempdir)
- self.zipfile.close()
+ shutil.rmtree(self.tempdir)
+ self.zipfile.close()
+
class ZipWriter(DirWriter):
def __init__(self, destination):
if destination[0:2] == "//":
destination = destination[2:]
- self.zipfile = zipfile.ZipFile(os.path.expanduser(destination), "w", zipfile.ZIP_DEFLATED)
+ self.zipfile = zipfile.ZipFile(os.path.expanduser(destination), "w",
+ zipfile.ZIP_DEFLATED)
self.dirname = os.path.splitext(os.path.basename(destination))[0]
self.tempdir = tempfile.mkdtemp()
View
194 couchbase/rest_client.py
@@ -17,16 +17,17 @@
import base64
try:
- import json
+ import json
except:
- import simplejson as json
+ import simplejson as json
import urllib
import httplib2
import socket
import time
import logger
-from exception import ServerAlreadyJoinedException, ServerUnavailableException, InvalidArgumentException
-from exception import BucketCreationException, ServerJoinException, BucketUnavailableException
+from exception import ServerAlreadyJoinedException,\
+ ServerUnavailableException, InvalidArgumentException,\
+ BucketCreationException, ServerJoinException, BucketUnavailableException
log = logger.logger("rest_client")
@@ -57,11 +58,12 @@ def rebalance_reached(self, percentage=100):
start = time.time()
progress = 0
retry = 0
- while progress is not -1 and progress <= percentage and retry < 20:
+ while progress != -1 and progress <= percentage and retry < 20:
#-1 is error , -100 means could not retrieve progress
progress = self.rest._rebalance_progress()
if progress == -100:
- log.error("unable to retrieve rebalanceProgress.try again in 2 seconds")
+ log.error("unable to retrieve rebalanceProgress.try again in"
+ " 2 seconds")
retry += 1
else:
retry = 0
@@ -71,7 +73,8 @@ def rebalance_reached(self, percentage=100):
return False
else:
duration = time.time() - start
- log.info('rebalance reached >%s percent in %s seconds ' % (progress, duration))
+ log.info('rebalance reached >%s percent in %s seconds ' %
+ (progress, duration))
return True
def is_cluster_rebalanced(self):
@@ -121,7 +124,8 @@ def wait_for_node_status(self, node, expected_status, timeout_in_seconds):
status_reached = True
break
if not status_reached:
- log.info("sleep for 5 seconds before reading the node.status again")
+ log.info("sleep for 5 seconds before reading the node.status"
+ " again")
time.sleep(5)
log.info('node %s status_reached : %s' % (node.id, status_reached))
return status_reached
@@ -134,7 +138,8 @@ def wait_for_replication(self, timeout_in_seconds=120):
break
wait_count += 1
if wait_count == 10:
- log.info('replication state : %s' % (self.all_nodes_replicated(debug=True)))
+ log.info('replication state : %s' %
+ (self.all_nodes_replicated(debug=True)))
wait_count = 0
time.sleep(5)
log.info('replication state : %s' % (self.all_nodes_replicated()))
@@ -145,7 +150,8 @@ def all_nodes_replicated(self, debug=False):
nodes = self.rest.node_statuses()
for node in nodes:
if debug:
- log.info("node %s replication state : %s" % (node.id, node.replication))
+ log.info("node %s replication state : %s" %
+ (node.id, node.replication))
if node.replication != 1.0:
replicated = False
return replicated
@@ -171,16 +177,17 @@ def create_design_doc(self, bucket, design_doc, function):
api = self.couch_api_base + '%s/_design/%s' % (bucket, design_doc)
#check if this view exists and update the rev
- status, content = self._http_request(api, 'PUT', function, headers=self._create_capi_headers())
+ headers = self._create_capi_headers()
+ status, content = self._http_request(api, 'PUT', function,
+ headers=headers)
json_parsed = json.loads(content)
- if not status == True:
+ if not status:
raise Exception("unable to create design doc")
return json_parsed
- #http://10.1.6.108:8091/bucket-0/_design/dev_6ca50/_view/dev_6ca50?limit=10&_=1311107815807
def view_results(self, bucket, design_doc, view, params, limit=100):
if view:
view_query = '{0}/_design/{1}/_view/{2}'
@@ -205,35 +212,40 @@ def view_results(self, bucket, design_doc, view, params, limit=100):
else:
api += "%s=%s" % (param, params[param])
- status, content = self._http_request(api, headers=self._create_capi_headers())
+ headers = self._create_capi_headers()
+ status, content = self._http_request(api, headers=headers)
json_parsed = json.loads(content)
- if not status == True:
- raise Exception("unable to obtain view results for " + api + "\n" + `status` + "\n" + content)
+ if not status:
+ raise Exception("unable to obtain view results for " + api + "\n"
+ + repr(status) + "\n" + content)
return json_parsed
def get_design_doc(self, bucket, design_doc):
api = self.couch_api_base + '%s/_design/%s' % (bucket, design_doc)
- status, content = self._http_request(api, headers=self._create_capi_headers())
+ headers = self._create_capi_headers()
+ status, content = self._http_request(api, headers=headers)
json_parsed = json.loads(content)
- if not status == True:
+ if not status:
raise Exception("unable to get design doc")
return json_parsed
def get_view(self, bucket, design_doc, view):
- api = self.couch_api_base + '%s/_design/%s/_view/%s' % (bucket, design_doc, view)
+ api = self.couch_api_base + ('%s/_design/%s/_view/%s' %
+ (bucket, design_doc, view))
- status, content = self._http_request(api, headers=self._create_capi_headers())
+ headers = self._create_capi_headers()
+ status, content = self._http_request(api, headers=headers)
json_parsed = json.loads(content)
- if not status == True:
+ if not status:
raise Exception("unable to get view")
return json_parsed
@@ -245,10 +257,11 @@ def delete_design_doc(self, bucket, design_doc):
#pass in the rev
api = api + "?rev=%s" % (rev)
- status, content = self._http_request(api, 'DELETE', headers=self._create_capi_headers())
+ headers = self._create_capi_headers()
+ status, content = self._http_request(api, 'DELETE', headers=headers)
json_parsed = json.loads(content)
- if not status == True:
+ if not status:
raise Exception("unable to delete the design doc")
@@ -263,18 +276,21 @@ def _create_headers(self):
if self.username == "default":
return {'Content-Type': 'application/json', 'Accept': '*/*'}
else:
- authorization = base64.encodestring('%s:%s' % (self.username, self.password))
+ authorization = base64.encodestring('%s:%s' % (self.username,
+ self.password))
return {'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': 'Basic %s' % authorization,
'Accept': '*/*'}
- def _http_request(self, api, method='GET', params='', headers=None, timeout=120):
+ def _http_request(self, api, method='GET', params='', headers=None,
+ timeout=120):
if not headers:
headers = self._create_headers()
end_time = time.time() + timeout
while True:
try:
- response, content = httplib2.Http().request(api, method, params, headers)
+ response, content = httplib2.Http().request(api, method,
+ params, headers)
if response['status'] in ['200', '201', '202']:
return True, content
else:
@@ -288,10 +304,12 @@ def _http_request(self, api, method='GET', params='', headers=None, timeout=120)
reason = json_parsed["error"]
status = reason
elif "errors" in json_parsed:
- errors = [error for _, error in json_parsed["errors"].iteritems()]
+ errors = [error for _, error in
+ json_parsed["errors"].iteritems()]
reason = ", ".join(errors)
status = reason
- log.error('%s error %s reason: %s %s' % (api, response['status'], reason, content))
+ log.error('%s error %s reason: %s %s' %
+ (api, response['status'], reason, content))
return status, content
except socket.error, e:
log.error(e)
@@ -312,7 +330,7 @@ def init_cluster(self, username='Administrator', password='password'):
log.info('settings/web params : %s' % (params))
status, content = self._http_request(api, 'POST', params)
- if not status == True:
+ if not status:
return False
return status
@@ -325,7 +343,7 @@ def init_cluster_port(self, username='Administrator', password='password'):
log.info('settings/web params : %s' % (params))
status, content = self._http_request(api, 'POST', params)
- if not status == True:
+ if not status:
return False
return status
@@ -340,7 +358,7 @@ def init_cluster_memoryQuota(self, username='Administrator',
log.info('pools/default params : %s' % (params))
status, content = self._http_request(api, 'POST', params)
- if not status == True:
+ if not status:
return False
return status
@@ -362,17 +380,19 @@ def add_node(self, user='', password='', remoteIp='', port='8091'):
status, content = self._http_request(api, 'POST', params)
- if status == True:
+ if status:
json_parsed = json.loads(content)
otpNodeId = json_parsed['otpNode']
otpNode = OtpNode(otpNodeId)
if otpNode.ip == '127.0.0.1':
otpNode.ip = self.ip
else:
- if content.find('Prepare join failed. Node is already part of cluster') >= 0:
+ if content.find('Prepare join failed. Node is already part of'
+ ' cluster') >= 0:
raise ServerAlreadyJoinedException(nodeIp=self.ip,
remoteIp=remoteIp)
- elif content.find('Prepare join failed. Joining node to itself is not allowed') >= 0:
+ elif content.find('Prepare join failed. Joining node to itself is'
+ ' not allowed') >= 0:
raise ServerJoinException(nodeIp=self.ip,
remoteIp=remoteIp)
else:
@@ -394,10 +414,11 @@ def eject_node(self, user='', password='', otpNode=None):
status, content = self._http_request(api, 'POST', params)
- if status == True:
+ if status:
log.info('ejectNode successful')
else:
- if content.find('Prepare join failed. Node is already part of cluster') >= 0:
+ if content.find('Prepare join failed. Node is already part of'
+ ' cluster') >= 0:
raise ServerAlreadyJoinedException(nodeIp=self.ip,
remoteIp=otpNode)
else:
@@ -415,12 +436,12 @@ def fail_over(self, otpNode=None):
status, content = self._http_request(api, 'POST', params)
- if status == True:
+ if status:
log.info('fail_over successful')
else:
log.error('fail_over error : %s' % (content))
- if not status == True:
+ if not status:
return False
return status
@@ -452,27 +473,28 @@ def rebalance(self, otpNodes, ejectedNodes):
status, content = self._http_request(api, 'POST', params)
- if status == True:
+ if status:
log.info('rebalance operation started')
else:
log.error('rebalance operation failed')
#extract the error
raise InvalidArgumentException('controller/rebalance',
parameters=params)
- if not status == True:
+ if not status:
return False
return status
def monitorRebalance(self):
start = time.time()
progress = 0
retry = 0
- while progress is not -1 and progress is not 100 and retry < 20:
+ while progress != -1 and progress != 100 and retry < 20:
#-1 is error , -100 means could not retrieve progress
progress = self._rebalance_progress()
if progress == -100:
- log.error("unable to retrieve rebalanceProgress.try again in 2 seconds")
+ log.error("unable to retrieve rebalanceProgress.try again in"
+ "2 seconds")
retry += 1
else:
retry = 0
@@ -496,7 +518,7 @@ def _rebalance_progress(self):
json_parsed = json.loads(content)
- if status == True:
+ if status:
if "status" in json_parsed:
if "errorMessage" in json_parsed:
log.error('%s - rebalance failed' % (json_parsed))
@@ -505,7 +527,8 @@ def _rebalance_progress(self):
if key.find('@') >= 0:
ns_1_dictionary = json_parsed[key]
percentage = ns_1_dictionary['progress'] * 100
- log.info('rebalance percentage : %s percent' % (percentage))
+ log.info('rebalance percentage : %s percent' %
+ (percentage))
break
if percentage == -1:
percentage = 0
@@ -526,7 +549,7 @@ def rebalance_statuses(self):
json_parsed = json.loads(content)
- if status == True:
+ if status:
rebalanced = json_parsed['balanced']
return rebalanced
@@ -536,7 +559,7 @@ def log_client_error(self, post):
status, content = self._http_request(api, 'POST', post)
- if not status == True:
+ if not status:
log.error('unable to logClientError')
#returns node data for this host
@@ -548,7 +571,7 @@ def get_nodes_self(self):
json_parsed = json.loads(content)
- if status == True:
+ if status:
node = RestParser().parse_get_nodes_response(json_parsed)
return node
@@ -561,7 +584,7 @@ def node_statuses(self):
json_parsed = json.loads(content)
- if status == True:
+ if status:
for key in json_parsed:
#each key contain node info
value = json_parsed[key]
@@ -584,7 +607,7 @@ def cluster_status(self):
json_parsed = json.loads(content)
- if status == True:
+ if status:
parsed = json_parsed
return parsed
@@ -597,7 +620,7 @@ def get_pools_info(self):
json_parsed = json.loads(content)