Permalink
Browse files

Merge pull request #47 from philip-doctor/Only-One-Describe-At-A-Time

Only describe-ring one at a time
  • Loading branch information...
2 parents c3b615d + cf12d0d commit 860a03a0fafe71605e1a4316dfdd8d0c29094703 @nickmbailey nickmbailey committed Aug 13, 2014
Showing with 74 additions and 12 deletions.
  1. +43 −7 telephus/pool.py
  2. +31 −5 test/test_cassandraclusterpool.py
View
@@ -175,7 +175,10 @@ class CassandraPoolReconnectorFactory(protocol.ClientFactory):
# requests still get made in their right keyspaces).
keyspace = None
- def __init__(self, node, service, sasl_cred_factory=None):
+ def __init__(self, node, service, sasl_cred_factory=None, describe_lock=None):
+ if not describe_lock:
+ raise ValueError("Expected describe_lock, got None.")
+ self.describe_lock = describe_lock
self.node = node
# if self.service is None, don't bother doing anything. nobody loves us.
self.service = service
@@ -283,18 +286,35 @@ def my_login(self, creds):
def my_set_keyspace(self, keyspace):
return self.execute(ManagedThriftRequest('set_keyspace', keyspace))
+ def done_describing(self, result):
+ self.describe_lock.release_lock()
+ return result
+
+ def done_describing_err(self, f):
+ self.describe_lock.release_lock()
+ return f
+
def my_describe_ring(self, keyspace=None):
if keyspace is None or keyspace in SYSTEM_KEYSPACES:
d = self.my_pick_non_system_keyspace()
else:
d = defer.succeed(keyspace)
- d.addCallback(lambda k: self.execute(ManagedThriftRequest('describe_ring', k)))
- def suppress_no_keyspaces_error(f):
- f.trap(NoKeyspacesAvailable)
- return ()
+ if not self.describe_lock.get_lock():
+ self.describe_lock.set_lock(d)
+ d.addCallback(lambda k: self.execute(ManagedThriftRequest('describe_ring', k)))
+
+ def suppress_no_keyspaces_error(f):
+ f.trap(NoKeyspacesAvailable)
+ return ()
+
+ d.addCallback(self.done_describing)
+ d.addErrback(self.done_describing_err)
+ d.addErrback(suppress_no_keyspaces_error)
+
+ else:
+ d = self.describe_lock.get_lock()
- d.addErrback(suppress_no_keyspaces_error)
return d
def my_describe_version(self):
@@ -707,6 +727,7 @@ def __init__(self, seed_list, keyspace=None, creds=None, thrift_port=None,
being called too fast in a failure situation
"""
+ self.describing_ring = False
self.seed_list = list(seed_list)
if thrift_port is None:
thrift_port = self.default_cassandra_thrift_port
@@ -1024,9 +1045,24 @@ def schedule_future_fill_pool(self, seconds):
else:
self.future_fill_pool.reset(seconds)
+ def set_lock(self, d):
+ """"Accepts a deferred, when set any calls to get_lock will return this deferred. Used with describe_ring to
+ ensure that only one call to describe ring is active at a time, and other calls simply share the same
+ result."""
+ self.describing_ring = d
+
+ def get_lock(self):
+ """"Returns either False if the lock is not set, or a deferred that was specified in set_lock."""
+ return self.describing_ring
+
+ def release_lock(self):
+ """"When calls to describe_ring finish, the lock is set back to false so future calls can get new results from
+ describe ring."""
+ self.describing_ring = False
+
def make_conn(self, node):
self.log('Adding connection to %s' % (node,))
- f = self.conn_factory(node, self, self.sasl_cred_factory)
+ f = self.conn_factory(node, self, self.sasl_cred_factory, self)
bindaddr = self.bind_address
if bindaddr is not None and isinstance(bindaddr, str):
bindaddr = (bindaddr, 0)
@@ -121,12 +121,13 @@ def killWorkingNode(self):
@contextlib.contextmanager
def cluster_and_pool(self, num_nodes=10, pool_size=5, start=True,
- cluster_class=None, node_discovery=True):
+ cluster_class=None, node_discovery=True, fill_throttle=0.0):
if cluster_class is None:
cluster_class = FakeCassandraCluster
cluster = cluster_class(num_nodes, start_port=self.start_port)
pool = CassandraClusterPool([cluster.iface], thrift_port=self.start_port,
- pool_size=pool_size, auto_node_discovery=node_discovery)
+ pool_size=pool_size, auto_node_discovery=node_discovery,
+ fill_pool_throttle=fill_throttle)
if start:
cluster.startService()
pool.startService()
@@ -384,11 +385,10 @@ def test_adjust_pool_size(self):
# turn up pool size once other nodes are known
self.pool.adjustPoolSize(pool_size)
- yield deferwait(0.1)
+ yield deferwait(.1)
self.assertNumConnections(pool_size)
self.assertNumUniqueConnections(pool_size)
-
dlist = []
for x in range(pool_size):
d = self.pool.get('key001', 'Standard1/wait=1.0',
@@ -871,6 +871,32 @@ def test_auto_discovery_off(self):
yield deferwait(0.5)
self.assertEqual(len(self.pool.nodes), 1)
+ def test_describe_ring_not_locked_after_creation(self):
+ num_nodes = 5
+ pool_size = 10
+ with self.cluster_and_pool(num_nodes=num_nodes, pool_size=pool_size):
+ self.assertFalse(self.pool.describing_ring)
+
+ def test_describe_errors_unlock_describe_ring(self):
+ num_nodes = 5
+ pool_size = 10
+ with self.cluster_and_pool(num_nodes=num_nodes, pool_size=pool_size):
+ conn = list(self.pool.connectors)[0]
+
+ def exec_raises(x, y=None):
+ import time
+ time.sleep(1)
+ raise ValueError("Bad things.")
+
+ try:
+ conn.execute = exec_raises
+ x = conn.my_describe_ring()
+ self.assertTrue(self.pool.describing_ring)
+ yield x
+ self.assertTrue(False, "Expected exception.")
+ except ValueError, e:
+ self.assertFalse(self.pool.describing_ring)
+
if cassanova:
class EnhancedCassanovaInterface(cassanova.CassanovaInterface):
@@ -946,4 +972,4 @@ def stopService(self):
d.addErrback(lambda n: None)
if not cassanova:
- del CassandraClusterPoolTest
+ raise AssertionError("Cassanova is required to run all tests")

0 comments on commit 860a03a

Please sign in to comment.