Skip to content

Commit

Permalink
auto-discover number of vbuckets from the server instead of using 1024
Browse files Browse the repository at this point in the history
  • Loading branch information
farshidce committed Jul 18, 2011
1 parent 3df4dc7 commit 6ee1d48
Showing 1 changed file with 12 additions and 4 deletions.
16 changes: 12 additions & 4 deletions pymembase/membaseclient.py
Expand Up @@ -222,6 +222,7 @@ def __init__(self, host='127.0.0.1', port=11211):
self.s.connect_ex((host, port))
self.r = random.Random()
self.log = logger.logger("MemcachedClient")
self.vbucket_count = 1024

def close(self):
self.s.close()
Expand Down Expand Up @@ -311,7 +312,7 @@ def decr(self, key, amt=1, init=0, exp=0):
def set(self, key, exp, flags, val, vbucket=-1):
"""Set a value in the memcached server."""
if vbucket == -1:
self.vbucketId = crc32.crc32_hash(key) & 1023
self.vbucketId = crc32.crc32_hash(key) & (self.vbucket_count - 1)
else:
self.vbucketId = vbucket
return self._mutate(MemcachedConstants.CMD_SET, key, exp, flags, 0, val)
Expand All @@ -332,7 +333,7 @@ def __parseGet(self, data, klen=0):
def get(self, key, vbucket=-1):
"""Get the value for a given key within the memcached server."""
if vbucket == -1:
self.vbucketId = crc32.crc32_hash(key) & 1023
self.vbucketId = crc32.crc32_hash(key) & (self.vbucket_count - 1)
else:
self.vbucketId = vbucket
parts = self._doCmd(MemcachedConstants.CMD_GET, key, '')
Expand Down Expand Up @@ -484,7 +485,7 @@ def noop(self):
def delete(self, key, cas=0, vbucket=-1):
"""Delete the value for a given key within the memcached server."""
if vbucket == -1:
self.vbucketId = crc32.crc32_hash(key) & 1023
self.vbucketId = crc32.crc32_hash(key) & (self.vbucket_count - 1)
return self._doCmd(MemcachedConstants.CMD_DELETE, key, '', '', cas)

def flush(self, timebomb=0):
Expand Down Expand Up @@ -602,6 +603,7 @@ def __init__(self, server, bucket):
self.dispatcher = CommandDispatcher(self)
self.dispatcher_thread = Thread(name="dispatcher-thread", target=self._start_dispatcher)
self.dispatcher_thread.start()
self.vbucket_count = 1024
#kick off dispatcher

def _start_dispatcher(self):
Expand All @@ -615,6 +617,7 @@ def __init__vbucket_map(self, rest, bucket, vbucket=-1):
if not vb_ready:
raise Exception("vbucket map is not ready for bucket {0}".format(bucket))
vBuckets = rest.get_vbuckets(bucket)
self.vbucket_count = len(vBuckets)
nodes = rest.get_nodes()
if vbucket == -1:
for vBucket in vBuckets:
Expand Down Expand Up @@ -656,7 +659,7 @@ def __init__vbucket_map(self, rest, bucket, vbucket=-1):
raise ex

def memcached(self, key):
vBucketId = crc32.crc32_hash(key) & 1023
vBucketId = crc32.crc32_hash(key) & (len(self._vBucketMap) - 1)
if vBucketId not in self._vBucketMap:
msg = "vbucket map does not have an entry for vb : {0}"
raise Exception(msg.format(vBucketId))
Expand Down Expand Up @@ -942,6 +945,9 @@ def direct_client(server, bucket):
else:
ip = server.ip
client = MemcachedClient(ip, node.memcached)
#set the vbucket count here ?
vBuckets = RestConnection(server).get_vbuckets(bucket)
client.vbucket_count = len(vBuckets)
bucket_info = RestConnection(server).get_bucket(bucket)
#todo raise exception for not bucket_info
client.sasl_auth_plain(bucket_info.name.encode('ascii'),
Expand All @@ -957,6 +963,8 @@ def proxy_client(server, bucket):
for node in nodes:
if node.ip == server.ip and int(node.port) == int(server.port):
client = MemcachedClient(server.ip, node.moxi)
vBuckets = RestConnection(server).get_vbuckets(bucket)
client.vbucket_count = len(vBuckets)
if bucket_info.authType == "sasl":
client.sasl_auth_plain(bucket_info.name.encode('ascii'),
bucket_info.saslPassword.encode('ascii'))
Expand Down

0 comments on commit 6ee1d48

Please sign in to comment.