Permalink
Browse files

Improve consistent hashing router.

Summary:
This change makes a couple improvements to routers to help the consistent hashing router.

1. Changes from using the host of the connection to using a new connection property called "identifier" to identify the connection in the consistent hash.  This allows multiple connections on the same hostname to both occur in the hash.
2. A Cluster object is more stategic in when it asks for retries from the router.  At present routers only support retry on a single host, but retryable routers may return more than one host.  The Cluster object now will check the len() of the hosts from the router's get_db() call and will only do the retryable_execute if that len is 1.  Routers are only supposed to return 1 host after a call to get_db() with a retry_for kwarg, but in case they do not the check and raised exception is still left in.

Test Plan:
The still passing existing tests and new tests.

Reviewers: dcramer, tail

CC:
  • Loading branch information...
1 parent 4530927 commit 51687571d7c4efe992f36f05a6de0364a86857ab @Fluxx Fluxx committed Jan 20, 2012
@@ -51,4 +51,9 @@ def __getattr__(self, attr):
return self.__dict__[attr]
else:
# Send to router
- return getattr(self.connection, attr)
+ return getattr(self.connection, attr)
+
+ @property
+ def identifier(self):
+ settings = ["%s=%s" % i for i in self.settings_dict.items()]
+ return self.backend.__name__ + ' '.join(settings)
View
@@ -79,11 +79,13 @@ def __getattr__(self, attr):
return CallProxy(self, attr)
def _execute(self, attr, args, kwargs):
- if self.router and self.router.retryable:
+ db_nums = self._db_nums_for(*args, **kwargs)
+
+ if self.router and len(db_nums) is 1 and self.router.retryable:
# The router supports retryable commands, so we want to run a
# separate algorithm for how we get connections to run commands on
# and then possibly retry
- self._retryable_execute(attr, *args, **kwargs)
+ self._retryable_execute(db_nums, attr, *args, **kwargs)
else:
connections = self._connections_for(*args, **kwargs)
results = [getattr(conn, attr)(*args, **kwargs) for conn in connections]
@@ -117,13 +119,12 @@ def get_conn(self, *args, **kwargs):
def map(self, workers=None):
return DistributedContextManager(self, workers)
- def _retryable_execute(self, attr, *args, **kwargs):
- db_nums = self._db_nums_for(*args, **kwargs)
+ def _retryable_execute(self, db_nums, attr, *args, **kwargs):
retries = 0
- while db_nums and retries <= self.max_connection_retries:
+ while retries <= self.max_connection_retries:
if len(db_nums) > 1:
- raise Exception('Retryable execute only supported by routers which return 1 DB')
+ raise Exception('Retryable router returned multiple DBs')
else:
connection = self[db_nums[0]]
@@ -20,8 +20,9 @@ def __init__(self, num, **options):
self._connection = None
self.num = num
- def __repr__(self):
- return '<%s: num=%s>' % (self.__class__.__name__, self.num)
+ @property
+ def identifier(self):
+ return repr(self)
@property
def connection(self):
@@ -25,6 +25,12 @@ def __init__(self, host='localhost', port=6379, db=0, timeout=None, **options):
self.timeout = timeout
super(Redis, self).__init__(**options)
+ @property
+ def identifier(self):
+ mapping = vars(self)
+ mapping['klass'] = self.__class__.__name__
+ return "redis://%(host)s:%(port)s/%(db)s" % mapping
+
def connect(self):
return RedisClient(host=self.host, port=self.port, db=self.db, socket_timeout=self.timeout)
View
@@ -21,6 +21,11 @@ def get_db(self, cluster, func, key=None, *args, **kwargs):
class ConsistentHashingRouter(BaseRouter):
+ '''
+ Router that returns host number based on a consistent hashing algorithm.
+ The consistent hashing algorithm only works if a key argument is provided.
+ If a key is not provided, then all hosts are returned.
+ '''
# Raised if all hosts in the hash have been marked as down
class HostListExhaused(Exception):
@@ -40,7 +45,7 @@ def __init__(self):
# There is one instance of this class that lives inside the Cluster object
def get_db(self, cluster, func, key=None, *args, **kwargs):
- self._setup_hash_and_connections(cluster, func, key=None, *args, **kwargs)
+ self._setup_hash_and_connections(cluster, *args, **kwargs)
if not cluster:
return []
@@ -51,15 +56,15 @@ def get_db(self, cluster, func, key=None, *args, **kwargs):
def flush_down_connections(self):
for connection in self._down_connections:
- self._hash.add_node(connection.host)
+ self._hash.add_node(connection.identifier)
self._down_connections = set()
- def _setup_hash_and_connections(self, cluster, func, key=None, *args, **kwargs):
+ def _setup_hash_and_connections(self, cluster, *args, **kwargs):
# Create the hash if it doesn't exist yet
if not hasattr(self, '_hash'):
- hostnames = [h.host for (i, h) in cluster.hosts.items()]
- self._hash = Ketama(hostnames)
+ strings = [h.identifier for (i, h) in cluster.hosts.items()]
+ self._hash = Ketama(strings)
self._handle_host_retries(cluster, retry_for=kwargs.get('retry_for'))
@@ -74,14 +79,14 @@ def _handle_host_retries(self, cluster, retry_for):
self._mark_connection_as_down(cluster[retry_for])
def _mark_connection_as_down(self, connection):
- self._hash.remove_node(connection.host)
+ self._hash.remove_node(connection.identifier)
self._down_connections.add(connection)
def _host_indexes_for(self, key, cluster):
- found_hostname = self._hash.get_node(key)
+ found = self._hash.get_node(key)
- if not found_hostname and len(self._down_connections) > 0:
+ if not found and len(self._down_connections) > 0:
raise self.HostListExhaused
return [i for (i, h) in cluster.hosts.items()
- if h.host is found_hostname]
+ if h.identifier == found]
@@ -79,6 +79,12 @@ def test_with_cluster(self):
cursor = p.execute('SELECT 1')
self.assertEquals(cursor.fetchone(), (1,))
+ def test_provides_identififer(self):
+ self.assertEqual(
+ "django.db.backends.sqlite3NAME=:memory: PORT=None HOST=None USER=None TEST_NAME=None PASSWORD=None OPTIONS={}",
+ self.db.identifier
+ )
+
# class DjangoPsycopg2Test(BaseTest):
# def setUp(self):
# from django.db.backends import postgresql_psycopg2
@@ -28,3 +28,6 @@ def test_with_cluster(self):
def test_provides_retryable_exceptions(self):
self.assertEquals(Redis.retryable_exceptions, redis.exceptions)
+
+ def test_provides_identifier(self):
+ self.assertEquals(self.redis.identifier, str(self.redis.identifier))
View
@@ -139,7 +139,7 @@ class FlakeyConnection(DummyConnection):
retryable_exceptions = [Exception]
- def foo(self):
+ def foo(self, *args, **kwargs):
if hasattr(self, 'already_failed'):
super(FlakeyConnection, self).foo()
else:
@@ -152,18 +152,28 @@ class RetryableRouter(DummyRouter):
def __init__(self):
self.kwargs_seen = []
+ self.key_args_seen = []
super(RetryableRouter, self).__init__()
def get_db(self, cluster, func, key=None, *args, **kwargs):
self.kwargs_seen.append(kwargs)
+ self.key_args_seen.append(key)
return [0]
-class ImposterRouter(DummyRouter):
+class InconsistentRouter(DummyRouter):
retryable = True
+ def __init__(self):
+ self.returned = False
+ super(InconsistentRouter, self).__init__()
+
def get_db(self, cluster, func, key=None, *args, **kwargs):
- return range(5)
+ if self.returned:
+ return range(5)
+ else:
+ self.returned = True
+ return [0]
class ScumbagConnection(DummyConnection):
@@ -196,6 +206,6 @@ def test_protection_from_infinate_loops(self):
self.assertRaises(Exception, cluster.foo)
def test_retryable_router_returning_multiple_dbs_raises_ecxeption(self):
- cluster = self.build_cluster(router=ImposterRouter)
- self.assertRaisesRegexp(Exception, 'only supported by routers',
+ cluster = self.build_cluster(router=InconsistentRouter, connection=ScumbagConnection)
+ self.assertRaisesRegexp(Exception, 'returned multiple DBs',
cluster.foo)
View
@@ -8,13 +8,21 @@
from . import BaseTest
from nydus.db import Cluster
+from nydus.db.backends import BaseConnection
from nydus.db.routers.redis import ConsistentHashingRouter
from nose import tools
-class DummyConnection(object):
+class DummyConnection(BaseConnection):
def __init__(self, i):
- self.host = 'host-' + str(i)
+ self.host = 'dummyhost'
+ self.i = i
+ super(DummyConnection, self).__init__(i)
+
+ @property
+ def identifier(self):
+ return "%s:%s" % (self.host, self.i)
+
class ConsistentHashingRouterTest(BaseTest):
@@ -57,36 +65,32 @@ def test_cluster_of_one_returns_one(self):
tools.assert_items_equal(['only_key'], self.get_db())
def test_multi_node_cluster_returns_correct_host(self):
- # This value "2" below is magic due to the halshing algo
- tools.assert_items_equal([1], self.get_db())
+ tools.assert_items_equal([2], self.get_db())
class RetryableTest(HashingTest):
- def test_is_retryable(self):
- tools.ok_(self.router.retryable)
-
def test_attempt_reconnect_threshold_is_set(self):
tools.assert_equal(self.router.attempt_reconnect_threshold, 100000)
def test_retry_gives_next_host_if_primary_is_offline(self):
- tools.assert_items_equal([1], self.get_db())
- tools.assert_items_equal([0], self.get_db(retry_for=1))
+ tools.assert_items_equal([2], self.get_db())
+ tools.assert_items_equal([4], self.get_db(retry_for=2))
def test_retry_host_change_is_sticky(self):
- tools.assert_items_equal([1], self.get_db())
- tools.assert_items_equal([0], self.get_db(retry_for=1))
+ tools.assert_items_equal([2], self.get_db())
+ tools.assert_items_equal([4], self.get_db(retry_for=2))
- tools.assert_items_equal([0], self.get_db())
+ tools.assert_items_equal([4], self.get_db())
def test_adds_back_down_host_once_attempt_reconnect_threshold_is_passed(self):
ConsistentHashingRouter.attempt_reconnect_threshold = 3
- tools.assert_items_equal([1], self.get_db())
- tools.assert_items_equal([0], self.get_db(retry_for=1))
- tools.assert_items_equal([0], self.get_db())
+ tools.assert_items_equal([2], self.get_db())
+ tools.assert_items_equal([4], self.get_db(retry_for=2))
+ tools.assert_items_equal([4], self.get_db())
# Router should add host 1 back to the pool now
- tools.assert_items_equal([1], self.get_db())
+ tools.assert_items_equal([2], self.get_db())
ConsistentHashingRouter.attempt_reconnect_threshold = 100000

0 comments on commit 5168757

Please sign in to comment.