From aa6db8fbc0c851dd7dc5cc467fc1cab8c7f0ad45 Mon Sep 17 00:00:00 2001 From: paul cannon Date: Fri, 4 Mar 2011 21:45:24 -0600 Subject: [PATCH] update tests to use cassanova --- test/test_cassandraclusterpool.py | 451 ++++++++++++++++++------------ 1 file changed, 266 insertions(+), 185 deletions(-) diff --git a/test/test_cassandraclusterpool.py b/test/test_cassandraclusterpool.py index 2658a70..014c2ec 100644 --- a/test/test_cassandraclusterpool.py +++ b/test/test_cassandraclusterpool.py @@ -1,94 +1,247 @@ -import sys -import pickle +from __future__ import with_statement + import random import contextlib -from functools import partial from twisted.trial import unittest -from twisted.internet import address, defer, reactor, protocol -from twisted.application import service, internet -from twisted.protocols import basic +from twisted.internet import defer, reactor from telephus.pool import (CassandraClusterPool, CassandraPoolReconnectorFactory, CassandraPoolParticipantClient) from telephus.cassandra import Cassandra, constants from telephus.cassandra.ttypes import * +from Cassanova import cassanova from twisted.python import log -def deferwait(s): +def deferwait(s, result=None): d = defer.Deferred() - reactor.callLater(s, d.callback, None) + reactor.callLater(s, d.callback, result) return d class CassandraClusterPoolTest(unittest.TestCase): - def setUp(self): - self.cass = FakeCassandraCluster(10) - self.cass.startService() - - def tearDown(self): - self.cass.stopService() - del self.cass + start_port = 44449 + ksname = 'TestKeyspace' @contextlib.contextmanager - def pool(self, *a, **kw): - c = MockedConnectionCassandraClusterPool(self.cass, *a, **kw) - c.startService() + def cluster_and_pool(self, num_nodes=10, pool_size=5, start=True): + cluster = FakeCassandraCluster(num_nodes, start_port=self.start_port) + pool = CassandraClusterPool([cluster.iface], thrift_port=self.start_port, + pool_size=pool_size) + if start: + cluster.startService() + pool.startService() + self.cluster = cluster + self.pool = pool try: - yield c + yield cluster, pool finally: - c.stopService() + del self.pool + del self.cluster + if pool.running: + pool.stopService() + if cluster.running: + cluster.stopService() + + @defer.inlineCallbacks + def make_standard_cfs(self, ksname=None): + if ksname is None: + ksname = self.ksname + yield self.pool.system_add_keyspace( + KsDef( + name=ksname, + replication_factor=1, + strategy_class='org.apache.cassandra.locator.SimpleStrategy', + cf_defs=( + CfDef( + keyspace=ksname, + name='Standard1', + column_type='Standard' + ), + CfDef( + keyspace=ksname, + name='Super1', + column_type='Super' + ) + ) + ) + ) + yield self.pool.set_keyspace(ksname) + yield self.pool.insert('key', 'Standard1', column='col', value='value') + + @defer.inlineCallbacks + def insert_dumb_rows(self, ksname=None, cf=None, numkeys=10, numcols=10, + timestamp=0): + if ksname is None: + ksname = self.ksname + if cf is None: + cf = 'Standard1' + yield self.pool.set_keyspace(ksname) + + mutmap = {} + for k in range(numkeys): + key = 'key%03d' % k + cols = [Column(name='%s-%03d-%03d' % (ksname, k, n), + value='val-%s-%03d-%03d' % (ksname, k, n), + timestamp=timestamp) + for n in range(numcols)] + mutmap[key] = {cf: cols} + yield self.pool.batch_mutate(mutationmap=mutmap) @defer.inlineCallbacks def test_set_keyspace(self): - with self.pool(pool_size=10) as c: - yield c.set_keyspace('keyspace_foo') - yield c.get('key', 'cf') - # should have changed two- not more or less - self.assertEqual(sorted(n.in_keyspace for n in self.cass.nodes), - sorted([None] * 8 + ['keyspace_foo'] * 2)) + pool_size=10 + num_nodes=4 + + with self.cluster_and_pool(num_nodes=num_nodes, pool_size=pool_size): + yield self.make_standard_cfs('KS1') + yield self.make_standard_cfs('KS2') + + yield self.insert_dumb_rows('KS1', numcols=pool_size+2) + yield self.insert_dumb_rows('KS2', numcols=pool_size+2) + + yield self.pool.set_keyspace('KS1') + first = self.pool.get('key000', 'Standard1/wait=2.0', 'KS1-000-000') + + yield self.pool.set_keyspace('KS2') + dfrds1 = [] + for x in range(pool_size + 1): + d = self.pool.get('key001', 'Standard1/wait=0.1', 'KS2-001-%03d' % x) + dfrds1.append(d) + + # all pool connections should have sent a real set_keyspace by + # now; change it up again + + yield self.pool.set_keyspace('KS1') + dfrds2 = [] + for x in range(pool_size + 1): + d = self.pool.get('key002', 'Standard1/wait=0.1', 'KS1-002-%03d' % x) + dfrds2.append(d) + + result = yield defer.DeferredList(dfrds1, consumeErrors=True) + for n, (succ, res) in enumerate(result): + self.assert_(succ, 'Failure on item %d was %s' % (n, res)) + res = res.column.value + self.assertEqual(res, 'val-KS2-001-%03d' % n) + + result = yield defer.DeferredList(dfrds2) + for n, (succ, res) in enumerate(result): + self.assert_(succ, 'Failure was %s' % res) + res = res.column.value + self.assertEqual(res, 'val-KS1-002-%03d' % n) + + yield self.pool.set_keyspace('KS2') + + result = (yield first).column.value + self.assertEqual(result, 'val-KS1-000-000') + + final = yield self.pool.get('key003', 'Standard1', 'KS2-003-005') + self.assertEqual(final.column.value, 'val-KS2-003-005') @defer.inlineCallbacks def test_bad_set_keyspace(self): - with self.pool() as c: - yield self.assertFailure(c.set_keyspace('i-dont-exist'), + with self.cluster_and_pool(): + yield self.make_standard_cfs('KS1') + yield self.insert_dumb_rows('KS1') + + yield self.assertFailure(self.pool.set_keyspace('i-dont-exist'), InvalidRequestException) - yield c.get('key', 'cf') - # should have changed none - self.assertEqual([n.in_keyspace for n in self.cass.nodes], - [None] * 10) + self.flushLoggedErrors() + + # should still be in KS1 + result = yield self.pool.get('key005', 'Standard1', 'KS1-005-000') + self.assertEqual(result.column.value, 'val-KS1-005-000') @defer.inlineCallbacks def test_ring_inspection(self): - c = MockedConnectionCassandraClusterPool(self.cass, use_seeds=1, pool_size=5) - self.assertEqual(len(c.seed_list), 1) - c.startService() - yield c.describe_cluster_name() - self.assertEqual(sorted((n.host, n.port) for n in c.nodes), - sorted(('localhost', f.port) for f in self.cass.nodes)) - yield deferwait(1) - self.assertEqual(c.num_active_conns(), 5) - c.stopService() + with self.cluster_and_pool(start=False): + self.assertEqual(len(self.pool.seed_list), 1) + self.cluster.startService() + self.pool.startService() + yield self.pool.describe_cluster_name() + self.assertEqual(len(self.pool.nodes), len(self.cluster.ring), msg=str(sorted(self.pool.nodes))) @defer.inlineCallbacks - def test_storm(self): - with self.pool() as c: - for i in xrange(500): - r = str(random.randint(1, 10000)) - ans = yield c.get('echo', r) - self.assertEqual(r, ans) - for n in self.cass.nodes: - self.assertApproximates(50.0, n.reqs_handled, 10) + def test_keyspace_connection(self): + numkeys = 10 + numcols = 10 + tries = 500 + + with self.cluster_and_pool(): + yield self.make_standard_cfs('KS1') + yield self.make_standard_cfs('KS2') + yield self.insert_dumb_rows('KS1', numkeys=numkeys, numcols=numcols) + yield self.insert_dumb_rows('KS2', numkeys=numkeys, numcols=numcols) + + ksconns = dict((ksname, self.pool.keyspaceConnection(ksname)) + for ksname in ('KS1', 'KS2')) + + dlist = [] + for i in xrange(tries): + keyspace = 'KS%d' % random.randint(1, 2) + keynum = '%03d' % random.randint(0, numkeys-1) + key = 'key' + keynum + col = '%s-%s-%03d' % (keyspace, keynum, random.randint(0, numcols-1)) + d = ksconns[keyspace].get(key, 'Standard1', col) + d.addCallback(lambda c: c.column.value) + d.addCallback(self.assertEqual, 'val-' + col) + dlist.append(d) + results = yield defer.DeferredList(dlist, consumeErrors=True) + for succ, answer in results: + if not succ: + answer.raiseException() @defer.inlineCallbacks - def test_storm2(self): - with self.pool() as c: - for i in xrange(500): - r = str(random.randint(1, 10000)) - wait = str(random.random() / 200) - ans = yield c.get('echo_with_wait', r, wait) - self.assertEqual(r, ans) - for n in self.cass.nodes: - self.assertApproximates(50.0, n.reqs_handled, 10) + def test_storm(self): + numkeys = 10 + numcols = 10 + tries = 500 + + with self.cluster_and_pool(): + yield self.make_standard_cfs() + yield self.insert_dumb_rows(numkeys=numkeys, numcols=numcols) + + dlist = [] + for i in xrange(tries): + keynum = '%03d' % random.randint(0, numkeys-1) + key = 'key' + keynum + col = '%s-%s-%03d' % (self.ksname, keynum, random.randint(0, numcols-1)) + d = self.pool.get(key, 'Standard1', col) + d.addCallback(lambda c: c.column.value) + d.addCallback(self.assertEqual, 'val-' + col) + dlist.append(d) + results = yield defer.DeferredList(dlist, consumeErrors=True) + for succ, answer in results: + if not succ: + answer.raiseException() + @defer.inlineCallbacks def test_retrying(self): + with self.cluster_and_pool(): + yield self.make_standard_cfs() + yield self.insert_dumb_rows() + + d = self.pool.get('key000', 'Standard1/wait=1.0', '%s-000-000' % self.ksname, + retries=3) + + # give the timed 'get' a chance to start + yield deferwait(0.2) + + # kill the backend handling the query + conns = self.cluster.get_working_connections() + self.assertEqual(len(conns), 1) + node, proto = conns[0] + proto.transport.loseConnection() + + # allow reconnect + yield deferwait(0.2) + newconns = self.cluster.get_working_connections() + log.msg('newconns: %r' % (newconns,)) + self.assertEqual(len(newconns), 1) + newnode, newproto = newconns[0] + # we want the preference to be reconnecting the same node + self.assertEqual(node, newnode) + answer = (yield d).column.value + self.assertEqual(answer, 'val-%s-000-000' % self.ksname) + + def test_resubmit_to_new_conn(self): pass def test_lower_pool_size(self): @@ -103,6 +256,9 @@ def test_connection_leveling(self): def test_huge_pool(self): pass + def test_finish_and_die(self): + pass + def test_problematic_conns(self): pass @@ -112,9 +268,6 @@ def test_manual_node_add(self): def test_manual_node_remove(self): pass - def test_keyspace_connection(self): - pass - def test_conn_loss_during_idle(self): pass @@ -127,129 +280,57 @@ def test_last_conn_loss_during_idle(self): def test_last_conn_loss_during_request(self): pass -class StringQueueProtocol(basic.Int16StringReceiver): - def __init__(self): - self.recvq = defer.DeferredQueue() - - def stringReceived(self, s): - self.recvq.put(s) - - def readString(self): - return self.recvq.get() - - def sendObj(self, obj): - return self.sendString(pickle.dumps(obj)) - - def readObj(self): - return self.readString().addCallback(pickle.loads) - -class FakeCassandraNodeProtocol(StringQueueProtocol): - def connectionMade(self): - StringQueueProtocol.connectionMade(self) - log.msg("got connection at port %d" % self.factory.port) - - def stringReceived(self, s): - self.factory.reqs_handled += 1 - methname, args = pickle.loads(s) - try: - meth = getattr(self, methname) - except AttributeError: - meth = partial(self.generic_response, methname) - d = defer.maybeDeferred(meth, *args) - d.addErrback(lambda f: f.value) - d.addCallback(self.sendObj) - - def set_keyspace(self, ksname): - if ksname not in [k.name for k in self.factory.keyspaces]: - raise InvalidRequestException('no such keyspace: %s' % ksname) - self.factory.in_keyspace = ksname - - def describe_keyspaces(self): - return self.factory.keyspaces - - def describe_ring(self, keyspace): - ring = self.factory.fakecluster.nodes - return [TokenRange(0, 0, ['localhost:%d' % r.port]) for r in ring] - - def get(self, key, cf, *a): - if key == 'echo': - return cf.column_family - elif key == 'echo_with_wait': - d = deferwait(float(cf.column)) - d.addCallback(lambda _: cf.column_family) - return d - return None - - def generic_response(self, methname, *a): - d = defer.Deferred() - reactor.callLater(0.01, d.callback, None) +class EnhancedCassanovaInterface(cassanova.CassanovaInterface): + """ + Add a way to request operations which are guaranteed to take (at least) a + given amount of time, for easier testing of things which might take a long + time in the real world + """ + + def get(self, key, column_path, consistency_level): + args = [] + if '/' in column_path.column_family: + parts = column_path.column_family.split('/') + column_path.column_family = parts[0] + args = parts[1:] + d = defer.maybeDeferred(cassanova.CassanovaInterface.get, self, key, + column_path, consistency_level) + for arg in args: + if arg.startswith('wait='): + d.addCallback(lambda a: deferwait(float(arg[5:]), a)) return d -class FakeCassandraNodeFactory(protocol.ServerFactory): - protocol = FakeCassandraNodeProtocol - keyspaces = [ - KsDef('keyspace_foo', - cf_defs=[ - CfDef('keyspace_foo', 'cf_1', column_type='Standard'), - CfDef('keyspace_foo', 'scf_1', column_type='Super'), - ]), - KsDef('keyspace_bar') - ] - - def __init__(self, fakecluster): - self.fakecluster = fakecluster - self.in_keyspace = None - self.reqs_handled = 0 - - def __repr__(self): - return '' % (self.port,) - -class FakeCassandraCluster(service.MultiService): - def __init__(self, num_nodes, start_port=41356): - service.MultiService.__init__(self) - self.nodes = [] - for x in xrange(num_nodes): - f = FakeCassandraNodeFactory(self) - port = start_port + x - serv = internet.TCPServer(port, f) - serv.setServiceParent(self) - f.port = port - self.nodes.append(f) - -class MockedCassandraPoolParticipantClient(StringQueueProtocol): - def __init__(self): - StringQueueProtocol.__init__(self) - self.iface = Cassandra.Iface - self.client = self - - def connectionMade(self): - StringQueueProtocol.connectionMade(self) - self.factory.clientConnectionMade(self) - - def __getattr__(self, name): - if name in self.iface: - return self.make_faker(name) - raise AttributeError(name) - - def make_faker(self, methodname): - def do_cmd(*args): - self.sendObj((methodname, args)) - return self.readObj().addCallback(self.raise_if_exc) - return do_cmd - - def raise_if_exc(self, x): - if isinstance(x, Exception): - raise x - return x - -class MockedCassandraPoolReconnectorFactory(CassandraPoolReconnectorFactory): - protocol = MockedCassandraPoolParticipantClient - -class MockedConnectionCassandraClusterPool(CassandraClusterPool): - conn_factory = MockedCassandraPoolReconnectorFactory - - def __init__(self, fakecluster, *a, **kw): - seed_hosts = [('localhost', f.port) for f in fakecluster.nodes] - use_seeds = kw.pop('use_seeds', len(seed_hosts)) - seed_hosts = seed_hosts[:use_seeds] - CassandraClusterPool.__init__(self, seed_hosts, *a, **kw) +class EnhancedCassanovaFactory(cassanova.CassanovaFactory): + handler_factory = EnhancedCassanovaInterface + +class EnhancedCassanovaNode(cassanova.CassanovaNode): + factory = EnhancedCassanovaFactory + + def endpoint_str(self): + return '%s:%d' % (self.addr.host, self.addr.port) + +class FakeCassandraCluster(cassanova.CassanovaService): + """ + Tweak the standard Cassanova service to allow nodes to run on the same + interface, but different ports. CassandraClusterPool already knows how + to understand the 'host:port' type of endpoint description in + describe_ring output. + """ + + def __init__(self, num_nodes, start_port=41356, interface='127.0.0.1'): + cassanova.CassanovaService.__init__(self, start_port) + self.iface = interface + for n in range(num_nodes): + self.add_node_on_port(start_port + n) + # make a non-system keyspace so that describe_ring can work + self.keyspaces['dummy'] = cassanova.KsDef( + 'dummy', + replication_factor=1, + strategy_class='org.apache.cassandra.locator.SimpleStrategy', + cf_defs=[] + ) + + def add_node_on_port(self, port, token=None): + node = EnhancedCassanovaNode(port, self.iface, token=token) + node.setServiceParent(self) + self.ring[node.mytoken] = node