Permalink
Browse files

Add a ConsistentHashingRouter

Summary:
When asked for a DB, the ConsistentHashingRouter will look up in a consistent
hash for the requested key.  The ConsistentHashingRouter has no notion of
connection status or availability, so it may/will give back DB numbers that are
not available.

If a client receives a DB number back from ConsistentHashingRouter and that DB
is unavailable for some reason, the client can re-request a db_number from the
router and pass a retry_for= kwarg that tells the router that the db num
provided in the retry_for arg was unavailable and should be marked as down.
ConsistentHashingRouter will then take the server out of the hash and return a
new db_num as a response.  If no hosts are online, the router will raise a
HostListExhaused exception.

To get hosts **back** in to the hash, the router will flush all down connections
at some interval based on the number of requests to get_db().  When that
threshold is crossed (currently, 100,000), the router flushes all its
connections that are marked as down and then returns the requested host.  That
host may still be down, and if so the client may mark it down again with the
retry_for kwarg.

There were a few other implementations on ways to track host availability that
were rejected:

1. Have the client manage host availability by telling the router to remove or
add servers.  This was deemed to complicated of an interface for clients, since
the notion of adding or removing hosts is unique to this router.
2. Have the router check on host availability every request, by letting clients
define a callable that the router would call when get_db() was called, to
guarantee a working host every call to get_db().  The overhead of checking
connectivity every request was deemed as too high.
3. Have the router check host availability through the callable (from #2 above)
"every so often."  This was the most viable rejected option, but was decided
that having the router  do more than just routing, in this case checking
connectivity, increased coupling too much.  Plus, since the router will still
give back DB numbers that may not be connected, the client still has to do the
work of understanding connection errors and taking the appropriate action - so
it's not a huge win.

Maniphest T117

Test Plan: Check out the newly added tests and also run the whole test suite.

Differential Revision: http://phabricator.local.disqus.net/D395
  • Loading branch information...
1 parent 11c8565 commit 08cacd40d3e9cf109ef57302f029820b2d48b496 Hudson committed with Fluxx Jan 17, 2012
View
@@ -1 +1,3 @@
-http://github.com/disqus/nydus/contributors
+http://github.com/disqus/nydus/contributors
+
+Ketama consistent hash algorithm for python written by Andrey Nikishaev: https://github.com/creotiv/PyKetama
View
@@ -0,0 +1,163 @@
+"""
+ Ketama consistent hash algorithm.
+
+ Rewrited from the original source: http://www.audioscrobbler.net/development/ketama/
+
+"""
+__author__ = "Andrey Nikishaev"
+__email__ = "creotiv@gmail.com"
+__version__ = 0.1
+__status__ = "productrion"
+
+__all__ = ['Ketama']
+
+import hashlib
+import math
+from bisect import bisect
+
+class Ketama(object):
+
+ def __init__(self, nodes=None, weights=None):
+ """
+ nodes - List of nodes(strings)
+ weights - Dictionary of node wheights where keys are nodes names.
+ if not set, all nodes will be equal.
+ """
+ self._hashring = dict()
+ self._sorted_keys = []
+
+ self._nodes = nodes
+ self._weights = weights if weights else {}
+
+ self._build_circle()
+
+ def _build_circle(self):
+ """
+ Creates hash ring.
+ """
+ total_weight = 0
+ for node in self._nodes:
+ total_weight += self._weights.get(node, 1)
+
+ for node in self._nodes:
+ weight = self._weights.get(node,1)
+
+ ks = math.floor((40*len(self._nodes) * weight) / total_weight);
+
+ for i in xrange(0, int(ks)):
+ b_key = self._md5_digest( '%s-%s-salt' % (node, i) )
+
+ for l in xrange(0, 4):
+ key = (( b_key[3+l*4] << 24 )
+ | ( b_key[2+l*4] << 16 )
+ | ( b_key[1+l*4] << 8 )
+ | b_key[l*4] )
+
+ self._hashring[key] = node
+ self._sorted_keys.append(key)
+
+ self._sorted_keys.sort()
+
+ def _get_node_pos(self, key):
+ """
+ Return node position(integer) for a given key. Else return None
+ """
+ if not self._hashring:
+ return None
+
+ key = self._gen_key(key)
+
+ nodes = self._sorted_keys
+ pos = bisect(nodes, key)
+
+ if pos == len(nodes):
+ return 0
+ return pos
+
+ def _gen_key(self, key):
+ """
+ Return long integer for a given key, that represent it place on
+ the hash ring.
+ """
+ b_key = self._md5_digest(key)
+ return self._hashi(b_key, lambda x: x)
+
+ def _hashi(self, b_key, fn):
+ return (( b_key[fn(3)] << 24 )
+ | ( b_key[fn(2)] << 16 )
+ | ( b_key[fn(1)] << 8 )
+ | b_key[fn(0)] )
+
+ def _md5_digest(self, key):
+ return map(ord, hashlib.md5(key).digest())
+
+ def remove_node(self,node):
+ """
+ Removes node from circle and rebuild it.
+ """
+ try:
+ self._nodes.remove(node)
+ del self._weights[node]
+ except KeyError,e:
+ pass
+ except ValueError,e:
+ pass
+ self._hashring = dict()
+ self._sorted_keys = []
+
+ self._build_circle()
+
+ def add_node(self,node,weight=1):
+ """
+ Adds node to circle and rebuild it.
+ """
+ self._nodes.append(node)
+ self._weights[node] = weight
+ self._hashring = dict()
+ self._sorted_keys = []
+
+ self._build_circle()
+
+ def get_node(self, key):
+ """
+ Return node for a given key. Else return None.
+ """
+ pos = self._get_node_pos(key)
+ if pos is None:
+ return None
+ return self._hashring[ self._sorted_keys[pos] ]
+
+
+if __name__ == '__main__':
+ def test(k):
+ data = {}
+ for i in xrange(REQUESTS):
+ tower = k.get_node('a'+str(i))
+ data.setdefault(tower,0)
+ data[tower] += 1
+ print 'Number of caches on each node: '
+ print data
+ print ''
+
+ print k.get_node('Aplple');
+ print k.get_node('Hello');
+ print k.get_node('Data');
+ print k.get_node('Computer');
+
+ NODES = ['192.168.0.1:6000','192.168.0.1:6001','192.168.0.1:6002',
+ '192.168.0.1:6003','192.168.0.1:6004','192.168.0.1:6005',
+ '192.168.0.1:6006','192.168.0.1:6008','192.168.0.1:6007'
+ ]
+ REQUESTS = 1000
+
+ k = Ketama(NODES)
+
+ test(k)
+
+ k.remove_node('192.168.0.1:6007')
+
+ test(k)
+
+ k.add_node('192.168.0.1:6007')
+
+ test(k)
View
@@ -60,10 +60,12 @@ class Cluster(object):
"""
Holds a cluster of connections.
"""
- def __init__(self, hosts, router=None):
+
+ def __init__(self, hosts, router=None, max_connection_retries=20):
self.hosts = hosts
self.router = router
-
+ self.max_connection_retries = max_connection_retries
+
def __len__(self):
return len(self.hosts)
@@ -77,18 +79,20 @@ def __getattr__(self, attr):
return CallProxy(self, attr)
def _execute(self, attr, args, kwargs):
- if self.router:
- db_nums = self.router.get_db(self, attr, *args, **kwargs)
+ if self.router and self.router.retryable:
+ # The router supports retryable commands, so we want to run a
+ # separate algorithm for how we get connections to run commands on
+ # and then possibly retry
+ self._retryable_execute(attr, *args, **kwargs)
else:
- db_nums = range(len(self))
-
- results = [getattr(self.hosts[num], attr)(*args, **kwargs) for num in db_nums]
+ connections = self._connections_for(*args, **kwargs)
+ results = [getattr(conn, attr)(*args, **kwargs) for conn in connections]
- # If we only had one db to query, we simply return that res
- if len(results) == 1:
- return results[0]
-
- return results
+ # If we only had one db to query, we simply return that res
+ if len(results) == 1:
+ return results[0]
+ else:
+ return results
def disconnect(self):
"""Disconnects all connections in cluster"""
@@ -103,18 +107,46 @@ def get_conn(self, *args, **kwargs):
during all steps of the process. An example of this would be
Redis pipelines.
"""
- if self.router:
- db_nums = self.router.get_db(self, 'get_conn', *args, **kwargs)
+ connections = self._connections_for(*args, **kwargs)
+
+ if len(connections) is 1:
+ return connections[0]
else:
- db_nums = range(len(self))
-
- if len(db_nums) == 1:
- return self[db_nums[0]]
- return [self[n] for n in db_nums]
+ return connections
def map(self, workers=None):
return DistributedContextManager(self, workers)
+ def _retryable_execute(self, attr, *args, **kwargs):
+ db_nums = self._db_nums_for(*args, **kwargs)
+ retries = 0
+
+ while db_nums and retries <= self.max_connection_retries:
+ if len(db_nums) > 1:
+ raise Exception('Retryable execute only supported by routers which return 1 DB')
+ else:
+ connection = self[db_nums[0]]
+
+ try:
+ return getattr(connection, attr)(*args, **kwargs)
+ except tuple(connection.retryable_exceptions):
+ # We had a failure, so get a new db_num and try again, noting
+ # the DB number that just failed, so the backend can mark it as
+ # down
+ db_nums = self._db_nums_for(retry_for=db_nums[0], *args, **kwargs)
+ retries += 1
+ else:
+ raise Exception('Maximum amount of connection retries exceeded')
+
+ def _db_nums_for(self, *args, **kwargs):
+ if self.router:
+ return self.router.get_db(self, 'get_conn', *args, **kwargs)
+ else:
+ return range(len(self))
+
+ def _connections_for(self, *args, **kwargs):
+ return [self[n] for n in self._db_nums_for(*args, **kwargs)]
+
class CallProxy(object):
"""
Handles routing function calls to the proper connection.
@@ -13,6 +13,9 @@ class BaseConnection(object):
Child classes should implement at least
connect() and disconnect() methods.
"""
+
+ retryable_exceptions = ()
+
def __init__(self, num, **options):
self._connection = None
self.num = num
@@ -25,7 +28,7 @@ def connection(self):
if self._connection is None:
self._connection = self.connect()
return self._connection
-
+
def close(self):
if self._connection:
self.disconnect()
@@ -9,10 +9,15 @@
from __future__ import absolute_import
from redis import Redis as RedisClient
+from redis import exceptions as client_exceptions
from nydus.db.backends import BaseConnection
class Redis(BaseConnection):
+
+ # Exceptions that can be retried by this backend
+ retryable_exceptions = client_exceptions
+
def __init__(self, host='localhost', port=6379, db=0, timeout=None, **options):
self.host = host
self.port = port
@@ -13,6 +13,8 @@ class BaseRouter(object):
# def __init__(self):
# pass
+ retryable = False
+
def get_db(self, cluster, func, *args, **kwargs):
"""Return the first entry in the cluster"""
return range(len(cluster))
View
@@ -9,10 +9,79 @@
from binascii import crc32
from nydus.db.routers import BaseRouter
+from nydus.contrib.ketama import Ketama
+
class PartitionRouter(BaseRouter):
def get_db(self, cluster, func, key=None, *args, **kwargs):
# Assume first argument is a key
if not key:
return range(len(cluster))
return [crc32(str(key)) % len(cluster)]
+
+
+class ConsistentHashingRouter(BaseRouter):
+
+ # Raised if all hosts in the hash have been marked as down
+ class HostListExhaused(Exception):
+ pass
+
+ # If this router can be retried on if a particular db index it gave out did
+ # not work
+ retryable = True
+
+ # How many requests to serve in a situation when a host is down before
+ # the down hosts are retried
+ attempt_reconnect_threshold = 100000
+
+ def __init__(self):
+ self._get_db_attempts = 0
+ self._down_connections = set()
+
+ # There is one instance of this class that lives inside the Cluster object
+ def get_db(self, cluster, func, key=None, *args, **kwargs):
+ self._setup_hash_and_connections(cluster, func, key=None, *args, **kwargs)
+
+ if not cluster:
+ return []
+ elif not key:
+ return range(len(cluster))
+ else:
+ return self._host_indexes_for(key, cluster)
+
+ def flush_down_connections(self):
+ for connection in self._down_connections:
+ self._hash.add_node(connection.host)
+
+ self._down_connections = set()
+
+ def _setup_hash_and_connections(self, cluster, func, key=None, *args, **kwargs):
+ # Create the hash if it doesn't exist yet
+ if not hasattr(self, '_hash'):
+ hostnames = [h.host for (i, h) in cluster.hosts.items()]
+ self._hash = Ketama(hostnames)
+
+ self._handle_host_retries(cluster, retry_for=kwargs.get('retry_for'))
+
+ def _handle_host_retries(self, cluster, retry_for):
+ self._get_db_attempts += 1
+
+ if self._get_db_attempts > self.attempt_reconnect_threshold:
+ self.flush_down_connections()
+ self._get_db_attempts = 0
+
+ if retry_for is not None:
+ self._mark_connection_as_down(cluster[retry_for])
+
+ def _mark_connection_as_down(self, connection):
+ self._hash.remove_node(connection.host)
+ self._down_connections.add(connection)
+
+ def _host_indexes_for(self, key, cluster):
+ found_hostname = self._hash.get_node(key)
+
+ if not found_hostname and len(self._down_connections) > 0:
+ raise self.HostListExhaused
+
+ return [i for (i, h) in cluster.hosts.items()
+ if h.host is found_hostname]
Oops, something went wrong.

0 comments on commit 08cacd4

Please sign in to comment.