Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Newer
Older
100644 908 lines (732 sloc) 33.523 kb
aa6db8f @thepaul update tests to use cassanova
thepaul authored
1 from __future__ import with_statement
2
f77efca @thepaul fill in some tests
thepaul authored
3 import random
4 import contextlib
faeb625 @thepaul test_lots_of_up_and_down
thepaul authored
5 from time import time
6be595a @thepaul test_connection_leveling
thepaul authored
6 from itertools import groupby
2c427f4 @thepaul mock cassandra cluster test harness for pooling
thepaul authored
7 from twisted.trial import unittest
aa6db8f @thepaul update tests to use cassanova
thepaul authored
8 from twisted.internet import defer, reactor
faeb625 @thepaul test_lots_of_up_and_down
thepaul authored
9 from twisted.python import log
2c427f4 @thepaul mock cassandra cluster test harness for pooling
thepaul authored
10 from telephus.pool import (CassandraClusterPool, CassandraPoolReconnectorFactory,
632d0d7 @thepaul test_zero_retries
thepaul authored
11 CassandraPoolParticipantClient, TTransport)
ff57817 @thepaul rearrange new module structure a bit
thepaul authored
12 from telephus import translate
567ef6c @thobbs Compatible with Cassandra 0.7 and 0.8
thobbs authored
13 from telephus.cassandra.c08 import Cassandra
2c427f4 @thepaul mock cassandra cluster test harness for pooling
thepaul authored
14 from telephus.cassandra.ttypes import *
258c2e2 @thepaul don't run CCP tests if Cassanova not available
thepaul authored
15
16 try:
17 from Cassanova import cassanova
18 except ImportError:
19 cassanova = None
2c427f4 @thepaul mock cassandra cluster test harness for pooling
thepaul authored
20
aa6db8f @thepaul update tests to use cassanova
thepaul authored
21 def deferwait(s, result=None):
bda4e9e @thepaul cancel timed 'get' calls when a CassanovaNode dies
thepaul authored
22 def canceller(my_d):
23 dcall.cancel()
24 d = defer.Deferred(canceller=canceller)
25 dcall = reactor.callLater(s, d.callback, result)
f77efca @thepaul fill in some tests
thepaul authored
26 return d
27
0efded5 @thepaul test_last_conn_loss_during_idle
thepaul authored
28 def addtimeout(d, waittime):
29 timeouter = reactor.callLater(waittime, d.cancel)
30 def canceltimeout(x):
31 if timeouter.active():
32 timeouter.cancel()
33 return x
34 d.addBoth(canceltimeout)
35
2c427f4 @thepaul mock cassandra cluster test harness for pooling
thepaul authored
36 class CassandraClusterPoolTest(unittest.TestCase):
aa6db8f @thepaul update tests to use cassanova
thepaul authored
37 start_port = 44449
38 ksname = 'TestKeyspace'
2c427f4 @thepaul mock cassandra cluster test harness for pooling
thepaul authored
39
2e5f961 @thepaul test_last_conn_loss_during_request
thepaul authored
40 def assertFired(self, d):
41 self.assert_(d.called, msg='%s has not been fired' % (d,))
42
188a4b6 @thepaul utility methods in test_cassandraclusterpool
thepaul authored
43 def assertNotFired(self, d):
44 self.assertNot(d.called, msg='Expected %s not to have been fired, but'
45 ' it has been fired.' % (d,))
46
47 def assertNumConnections(self, num):
48 conns = self.cluster.get_connections()
49 self.assertEqual(len(conns), num,
50 msg='Expected %d existing connections to cluster, but'
51 ' %d found.' % (num, len(conns)))
52 return conns
53
54 def assertNumUniqueConnections(self, num):
55 conns = self.cluster.get_connections()
56 conns = set(n for (n,p) in conns)
57 self.assertEqual(len(conns), num,
58 msg='Expected %d unique nodes in cluster with existing'
6be595a @thepaul test_connection_leveling
thepaul authored
59 ' connections, but %d found. Whole set: %r'
60 % (num, len(conns), sorted(conns)))
188a4b6 @thepaul utility methods in test_cassandraclusterpool
thepaul authored
61 return conns
62
63 def assertNumWorkers(self, num):
64 workers = self.cluster.get_working_connections()
65 self.assertEqual(len(workers), num,
66 msg='Expected %d pending requests being worked on in '
67 'cluster, but %d found' % (num, len(workers)))
68 return workers
69
70 def killSomeConn(self):
71 conns = self.cluster.get_connections()
72 self.assertNotEqual(len(conns), 0)
73 node, proto = conns[0]
74 proto.transport.loseConnection()
75 return proto
76
77 def killSomeNode(self):
78 conns = self.cluster.get_connections()
79 self.assertNotEqual(len(conns), 0)
80 node, proto = conns[0]
81 node.stopService()
82 return node
83
84 def killWorkingConn(self):
85 conns = self.cluster.get_working_connections()
86 self.assertNotEqual(len(conns), 0)
87 node, proto = conns[0]
88 proto.transport.loseConnection()
89 return proto
90
91 def killWorkingNode(self):
92 conns = self.cluster.get_working_connections()
93 self.assertNotEqual(len(conns), 0)
94 node, proto = conns[0]
95 node.stopService()
96 return node
97
f77efca @thepaul fill in some tests
thepaul authored
98 @contextlib.contextmanager
69c6910 @thepaul test_manual_node_add
thepaul authored
99 def cluster_and_pool(self, num_nodes=10, pool_size=5, start=True,
567ef6c @thobbs Compatible with Cassandra 0.7 and 0.8
thobbs authored
100 cluster_class=None, api_version=None):
69c6910 @thepaul test_manual_node_add
thepaul authored
101 if cluster_class is None:
102 cluster_class = FakeCassandraCluster
103 cluster = cluster_class(num_nodes, start_port=self.start_port)
aa6db8f @thepaul update tests to use cassanova
thepaul authored
104 pool = CassandraClusterPool([cluster.iface], thrift_port=self.start_port,
567ef6c @thobbs Compatible with Cassandra 0.7 and 0.8
thobbs authored
105 pool_size=pool_size, api_version=api_version)
aa6db8f @thepaul update tests to use cassanova
thepaul authored
106 if start:
107 cluster.startService()
108 pool.startService()
109 self.cluster = cluster
110 self.pool = pool
f77efca @thepaul fill in some tests
thepaul authored
111 try:
aa6db8f @thepaul update tests to use cassanova
thepaul authored
112 yield cluster, pool
f77efca @thepaul fill in some tests
thepaul authored
113 finally:
aa6db8f @thepaul update tests to use cassanova
thepaul authored
114 del self.pool
115 del self.cluster
116 if pool.running:
117 pool.stopService()
118 if cluster.running:
119 cluster.stopService()
120
121 @defer.inlineCallbacks
122 def make_standard_cfs(self, ksname=None):
123 if ksname is None:
124 ksname = self.ksname
125 yield self.pool.system_add_keyspace(
126 KsDef(
127 name=ksname,
128 replication_factor=1,
129 strategy_class='org.apache.cassandra.locator.SimpleStrategy',
130 cf_defs=(
131 CfDef(
132 keyspace=ksname,
133 name='Standard1',
134 column_type='Standard'
135 ),
136 CfDef(
137 keyspace=ksname,
138 name='Super1',
139 column_type='Super'
140 )
141 )
142 )
143 )
144 yield self.pool.set_keyspace(ksname)
145 yield self.pool.insert('key', 'Standard1', column='col', value='value')
146
147 @defer.inlineCallbacks
148 def insert_dumb_rows(self, ksname=None, cf=None, numkeys=10, numcols=10,
149 timestamp=0):
150 if ksname is None:
151 ksname = self.ksname
152 if cf is None:
153 cf = 'Standard1'
154 yield self.pool.set_keyspace(ksname)
155
156 mutmap = {}
157 for k in range(numkeys):
158 key = 'key%03d' % k
159 cols = [Column(name='%s-%03d-%03d' % (ksname, k, n),
160 value='val-%s-%03d-%03d' % (ksname, k, n),
161 timestamp=timestamp)
162 for n in range(numcols)]
163 mutmap[key] = {cf: cols}
164 yield self.pool.batch_mutate(mutationmap=mutmap)
f77efca @thepaul fill in some tests
thepaul authored
165
166 @defer.inlineCallbacks
167 def test_set_keyspace(self):
aa6db8f @thepaul update tests to use cassanova
thepaul authored
168 pool_size=10
169 num_nodes=4
170
171 with self.cluster_and_pool(num_nodes=num_nodes, pool_size=pool_size):
172 yield self.make_standard_cfs('KS1')
173 yield self.make_standard_cfs('KS2')
174
175 yield self.insert_dumb_rows('KS1', numcols=pool_size+2)
176 yield self.insert_dumb_rows('KS2', numcols=pool_size+2)
177
178 yield self.pool.set_keyspace('KS1')
179 first = self.pool.get('key000', 'Standard1/wait=2.0', 'KS1-000-000')
180
181 yield self.pool.set_keyspace('KS2')
182 dfrds1 = []
183 for x in range(pool_size + 1):
184 d = self.pool.get('key001', 'Standard1/wait=0.1', 'KS2-001-%03d' % x)
185 dfrds1.append(d)
186
187 # all pool connections should have sent a real set_keyspace by
188 # now; change it up again
189
190 yield self.pool.set_keyspace('KS1')
191 dfrds2 = []
192 for x in range(pool_size + 1):
193 d = self.pool.get('key002', 'Standard1/wait=0.1', 'KS1-002-%03d' % x)
194 dfrds2.append(d)
195
196 result = yield defer.DeferredList(dfrds1, consumeErrors=True)
197 for n, (succ, res) in enumerate(result):
198 self.assert_(succ, 'Failure on item %d was %s' % (n, res))
199 res = res.column.value
200 self.assertEqual(res, 'val-KS2-001-%03d' % n)
201
202 result = yield defer.DeferredList(dfrds2)
203 for n, (succ, res) in enumerate(result):
204 self.assert_(succ, 'Failure was %s' % res)
205 res = res.column.value
206 self.assertEqual(res, 'val-KS1-002-%03d' % n)
207
208 yield self.pool.set_keyspace('KS2')
209
210 result = (yield first).column.value
211 self.assertEqual(result, 'val-KS1-000-000')
212
213 final = yield self.pool.get('key003', 'Standard1', 'KS2-003-005')
214 self.assertEqual(final.column.value, 'val-KS2-003-005')
f77efca @thepaul fill in some tests
thepaul authored
215
2c427f4 @thepaul mock cassandra cluster test harness for pooling
thepaul authored
216 @defer.inlineCallbacks
f77efca @thepaul fill in some tests
thepaul authored
217 def test_bad_set_keyspace(self):
aa6db8f @thepaul update tests to use cassanova
thepaul authored
218 with self.cluster_and_pool():
219 yield self.make_standard_cfs('KS1')
220 yield self.insert_dumb_rows('KS1')
221
222 yield self.assertFailure(self.pool.set_keyspace('i-dont-exist'),
f77efca @thepaul fill in some tests
thepaul authored
223 InvalidRequestException)
aa6db8f @thepaul update tests to use cassanova
thepaul authored
224 self.flushLoggedErrors()
225
226 # should still be in KS1
227 result = yield self.pool.get('key005', 'Standard1', 'KS1-005-000')
228 self.assertEqual(result.column.value, 'val-KS1-005-000')
f77efca @thepaul fill in some tests
thepaul authored
229
230 @defer.inlineCallbacks
231 def test_ring_inspection(self):
aa6db8f @thepaul update tests to use cassanova
thepaul authored
232 with self.cluster_and_pool(start=False):
233 self.assertEqual(len(self.pool.seed_list), 1)
234 self.cluster.startService()
235 self.pool.startService()
236 yield self.pool.describe_cluster_name()
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
237 self.assertEqual(len(self.pool.nodes), len(self.cluster.ring))
2c427f4 @thepaul mock cassandra cluster test harness for pooling
thepaul authored
238
f77efca @thepaul fill in some tests
thepaul authored
239 @defer.inlineCallbacks
aa6db8f @thepaul update tests to use cassanova
thepaul authored
240 def test_keyspace_connection(self):
241 numkeys = 10
242 numcols = 10
243 tries = 500
244
245 with self.cluster_and_pool():
246 yield self.make_standard_cfs('KS1')
247 yield self.make_standard_cfs('KS2')
248 yield self.insert_dumb_rows('KS1', numkeys=numkeys, numcols=numcols)
249 yield self.insert_dumb_rows('KS2', numkeys=numkeys, numcols=numcols)
250
251 ksconns = dict((ksname, self.pool.keyspaceConnection(ksname))
252 for ksname in ('KS1', 'KS2'))
253
254 dlist = []
255 for i in xrange(tries):
256 keyspace = 'KS%d' % random.randint(1, 2)
257 keynum = '%03d' % random.randint(0, numkeys-1)
258 key = 'key' + keynum
259 col = '%s-%s-%03d' % (keyspace, keynum, random.randint(0, numcols-1))
260 d = ksconns[keyspace].get(key, 'Standard1', col)
261 d.addCallback(lambda c: c.column.value)
262 d.addCallback(self.assertEqual, 'val-' + col)
263 dlist.append(d)
264 results = yield defer.DeferredList(dlist, consumeErrors=True)
265 for succ, answer in results:
266 if not succ:
267 answer.raiseException()
f77efca @thepaul fill in some tests
thepaul authored
268
269 @defer.inlineCallbacks
aa6db8f @thepaul update tests to use cassanova
thepaul authored
270 def test_storm(self):
271 numkeys = 10
272 numcols = 10
273 tries = 500
274
275 with self.cluster_and_pool():
276 yield self.make_standard_cfs()
277 yield self.insert_dumb_rows(numkeys=numkeys, numcols=numcols)
278
279 dlist = []
280 for i in xrange(tries):
281 keynum = '%03d' % random.randint(0, numkeys-1)
282 key = 'key' + keynum
283 col = '%s-%s-%03d' % (self.ksname, keynum, random.randint(0, numcols-1))
284 d = self.pool.get(key, 'Standard1', col)
285 d.addCallback(lambda c: c.column.value)
286 d.addCallback(self.assertEqual, 'val-' + col)
287 dlist.append(d)
288 results = yield defer.DeferredList(dlist, consumeErrors=True)
289 for succ, answer in results:
290 if not succ:
291 answer.raiseException()
f77efca @thepaul fill in some tests
thepaul authored
292
aa6db8f @thepaul update tests to use cassanova
thepaul authored
293 @defer.inlineCallbacks
f77efca @thepaul fill in some tests
thepaul authored
294 def test_retrying(self):
aa6db8f @thepaul update tests to use cassanova
thepaul authored
295 with self.cluster_and_pool():
296 yield self.make_standard_cfs()
297 yield self.insert_dumb_rows()
298
299 d = self.pool.get('key000', 'Standard1/wait=1.0', '%s-000-000' % self.ksname,
300 retries=3)
301
302 # give the timed 'get' a chance to start
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
303 yield deferwait(0.05)
aa6db8f @thepaul update tests to use cassanova
thepaul authored
304
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
305 workers = self.assertNumWorkers(1)
306 self.killWorkingConn()
aa6db8f @thepaul update tests to use cassanova
thepaul authored
307
308 # allow reconnect
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
309 yield deferwait(0.1)
310
311 newworkers = self.assertNumWorkers(1)
312
aa6db8f @thepaul update tests to use cassanova
thepaul authored
313 # we want the preference to be reconnecting the same node
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
314 self.assertEqual(workers[0][0], newworkers[0][0])
aa6db8f @thepaul update tests to use cassanova
thepaul authored
315 answer = (yield d).column.value
316 self.assertEqual(answer, 'val-%s-000-000' % self.ksname)
faeb625 @thepaul test_lots_of_up_and_down
thepaul authored
317 self.flushLoggedErrors()
aa6db8f @thepaul update tests to use cassanova
thepaul authored
318
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
319 @defer.inlineCallbacks
aa6db8f @thepaul update tests to use cassanova
thepaul authored
320 def test_resubmit_to_new_conn(self):
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
321 pool_size = 8
322
323 with self.cluster_and_pool(pool_size=1):
96be9ba @thepaul test_resubmit_to_new_conn
thepaul authored
324 yield self.make_standard_cfs()
325 yield self.insert_dumb_rows()
326
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
327 # turn up pool size once other nodes are known
328 self.pool.adjustPoolSize(pool_size)
329 yield deferwait(0.1)
330
96be9ba @thepaul test_resubmit_to_new_conn
thepaul authored
331 d = self.pool.get('key005', 'Standard1/wait=1.0', '%s-005-000' % self.ksname,
332 retries=3)
333
334 # give the timed 'get' a chance to start
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
335 yield deferwait(0.1)
96be9ba @thepaul test_resubmit_to_new_conn
thepaul authored
336
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
337 workers = self.assertNumWorkers(1)
338 node = self.killWorkingNode()
96be9ba @thepaul test_resubmit_to_new_conn
thepaul authored
339
340 # allow reconnect
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
341 yield deferwait(0.5)
342 newworkers = self.assertNumWorkers(1)
343
344 # reconnect should have been to a different node
345 self.assertNotEqual(workers[0][0], newworkers[0][0])
346
96be9ba @thepaul test_resubmit_to_new_conn
thepaul authored
347 answer = (yield d).column.value
348 self.assertEqual(answer, 'val-%s-005-000' % self.ksname)
349
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
350 self.flushLoggedErrors()
f77efca @thepaul fill in some tests
thepaul authored
351
6c946ae @thepaul test_adjust_pool_size
thepaul authored
352 @defer.inlineCallbacks
353 def test_adjust_pool_size(self):
354 pool_size = 8
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
355 diminish_by = 2
f77efca @thepaul fill in some tests
thepaul authored
356
6c946ae @thepaul test_adjust_pool_size
thepaul authored
357 with self.cluster_and_pool(pool_size=1):
358 yield self.make_standard_cfs()
359 yield self.insert_dumb_rows()
360
361 # turn up pool size once other nodes are known
362 self.pool.adjustPoolSize(pool_size)
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
363 yield deferwait(0.1)
6c946ae @thepaul test_adjust_pool_size
thepaul authored
364
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
365 self.assertNumConnections(pool_size)
366 self.assertNumUniqueConnections(pool_size)
6c946ae @thepaul test_adjust_pool_size
thepaul authored
367
368 dlist = []
369 for x in range(pool_size):
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
370 d = self.pool.get('key001', 'Standard1/wait=1.0',
371 '%s-001-002' % self.ksname, retries=0)
6c946ae @thepaul test_adjust_pool_size
thepaul authored
372 d.addCallback(lambda c: c.column.value)
373 d.addCallback(self.assertEqual, 'val-%s-001-002' % self.ksname)
374 dlist.append(d)
375
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
376 yield deferwait(0.1)
377
378 for d in dlist:
379 self.assertNotFired(d)
380 self.assertNumConnections(pool_size)
381 self.assertNumWorkers(pool_size)
382 self.assertNumUniqueConnections(pool_size)
6c946ae @thepaul test_adjust_pool_size
thepaul authored
383
384 # turn down pool size
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
385 self.pool.adjustPoolSize(pool_size - diminish_by)
6c946ae @thepaul test_adjust_pool_size
thepaul authored
386 yield deferwait(0.1)
387
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
388 # still pool_size conns until the ongoing requests finish
389 for d in dlist:
390 self.assertNotFired(d)
391 self.assertNumConnections(pool_size)
392 self.assertEqual(len(self.pool.dying_conns), diminish_by)
6c946ae @thepaul test_adjust_pool_size
thepaul authored
393
394 result = yield defer.DeferredList(dlist, consumeErrors=True)
395 for succ, answer in result:
396 if not succ:
397 answer.raiseException()
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
398 yield deferwait(0.1)
399
400 self.assertNumConnections(pool_size - diminish_by)
401 self.assertNumWorkers(0)
f77efca @thepaul fill in some tests
thepaul authored
402
632d0d7 @thepaul test_zero_retries
thepaul authored
403 @defer.inlineCallbacks
2e5f961 @thepaul test_last_conn_loss_during_request
thepaul authored
404 def test_zero_retries(self):
632d0d7 @thepaul test_zero_retries
thepaul authored
405 with self.cluster_and_pool():
406 yield self.make_standard_cfs()
407 yield self.insert_dumb_rows()
408 d = self.pool.get('key006', 'Standard1/wait=0.5',
409 '%s-006-002' % self.ksname, retries=0)
410
411 yield deferwait(0.05)
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
412 self.assertNumWorkers(1)
632d0d7 @thepaul test_zero_retries
thepaul authored
413
414 # kill the connection handling the query- an immediate retry
415 # should work, if a retry is attempted
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
416 self.killWorkingConn()
632d0d7 @thepaul test_zero_retries
thepaul authored
417
418 yield self.assertFailure(d, TTransport.TTransportException)
419
420 self.flushLoggedErrors()
2e5f961 @thepaul test_last_conn_loss_during_request
thepaul authored
421
ab5b24b @thepaul test_exhaust_retries
thepaul authored
422 @defer.inlineCallbacks
2e5f961 @thepaul test_last_conn_loss_during_request
thepaul authored
423 def test_exhaust_retries(self):
ab5b24b @thepaul test_exhaust_retries
thepaul authored
424 retries = 3
425 num_nodes = pool_size = retries + 2
426
427 with self.cluster_and_pool(num_nodes=num_nodes, pool_size=1):
428 yield self.make_standard_cfs()
429 yield self.insert_dumb_rows()
430
431 # turn up pool size once other nodes are known
432 self.pool.adjustPoolSize(pool_size)
433 yield deferwait(0.2)
434
435 self.assertNumConnections(pool_size)
436 self.assertNumUniqueConnections(pool_size)
437
438 d = self.pool.get('key002', 'Standard1/wait=0.5',
439 '%s-002-003' % self.ksname, retries=retries)
440 yield deferwait(0.05)
441
442 for retry in range(retries + 1):
443 self.assertNumConnections(pool_size)
444 self.assertNumWorkers(1)
445 self.assertNotFired(d)
446 self.killWorkingNode()
447 yield deferwait(0.1)
448
449 yield self.assertFailure(d, TTransport.TTransportException)
450
451 self.flushLoggedErrors()
2e5f961 @thepaul test_last_conn_loss_during_request
thepaul authored
452
521c4ff @thepaul test_kill_pending_conns
thepaul authored
453 @defer.inlineCallbacks
454 def test_kill_pending_conns(self):
455 num_nodes = pool_size = 8
456 fake_pending = 2
457
458 with self.cluster_and_pool(num_nodes=num_nodes, pool_size=1):
459 yield self.make_standard_cfs()
460 yield self.insert_dumb_rows()
461
462 # turn up pool size once other nodes are known
463 self.pool.adjustPoolSize(pool_size)
464 yield deferwait(0.1)
465
466 self.assertNumConnections(pool_size)
467 self.assertNumUniqueConnections(pool_size)
468
469 class fake_connector:
470 def __init__(self, nodename):
471 self.node = nodename
472 self.stopped = False
473
474 def stopFactory(self):
475 self.stopped = True
476
477 fakes = [fake_connector('fake%02d' % n) for n in range(fake_pending)]
478 # by putting them in connectors but not good_conns, these will
479 # register as connection-pending
480 self.pool.connectors.update(fakes)
481
482 self.assertEqual(self.pool.num_pending_conns(), 2)
483 self.pool.adjustPoolSize(pool_size)
484
485 # the pending conns should have been killed first
486 self.assertEqual(self.pool.num_pending_conns(), 0)
487 self.assertEqual(self.pool.num_connectors(), pool_size)
488 self.assertNumConnections(pool_size)
489 self.assertNumUniqueConnections(pool_size)
490
491 for fk in fakes:
492 self.assert_(fk.stopped, msg='Fake %s was not stopped!' % fk.node)
493
6be595a @thepaul test_connection_leveling
thepaul authored
494 @defer.inlineCallbacks
f77efca @thepaul fill in some tests
thepaul authored
495 def test_connection_leveling(self):
6be595a @thepaul test_connection_leveling
thepaul authored
496 num_nodes = 8
497 conns_per_node = 10
498 tolerance_factor = 0.20
499
500 def assertConnsPerNode(numconns):
501 tolerance = int(tolerance_factor * numconns)
502 conns = self.cluster.get_connections()
503 pernode = {}
504 for node, nodeconns in groupby(sorted(conns), lambda (n,p): n):
505 pernode[node] = len(list(nodeconns))
506 for node, conns_here in pernode.items():
507 self.assertApproximates(numconns, conns_here, tolerance,
508 msg='Expected ~%r (+- %r) connections to %r,'
509 ' but found %r. Whole map: %r'
510 % (numconns, tolerance, node, conns_here,
511 pernode))
512
513 with self.cluster_and_pool(num_nodes=num_nodes, pool_size=1):
514 pool_size = num_nodes * conns_per_node
515
516 yield self.make_standard_cfs()
517 yield self.insert_dumb_rows()
518
519 # turn up pool size once other nodes are known
520 self.pool.adjustPoolSize(pool_size)
521 yield deferwait(0.3)
522
523 # make sure conns are (at least mostly) balanced
524 self.assertNumConnections(pool_size)
525 self.assertNumUniqueConnections(num_nodes)
526
527 assertConnsPerNode(conns_per_node)
528
529 # kill a node and make sure connections are remade in a
530 # balanced way
531 node = self.killSomeNode()
532 yield deferwait(0.6)
533
534 self.assertNumConnections(pool_size)
535 self.assertNumUniqueConnections(num_nodes - 1)
536
537 assertConnsPerNode(pool_size / (num_nodes - 1))
538
539 # lower pool size, check that connections are killed in a
540 # balanced way
541 new_pool_size = pool_size - conns_per_node
542 self.pool.adjustPoolSize(new_pool_size)
543 yield deferwait(0.2)
544
545 self.assertNumConnections(new_pool_size)
546 self.assertNumUniqueConnections(num_nodes - 1)
547
548 assertConnsPerNode(new_pool_size / (num_nodes - 1))
549
550 # restart the killed node again and wait for the pool to notice
551 # that it's up
552 node.startService()
553 yield deferwait(0.5)
554
555 # raise pool size again, check balanced
556 self.pool.adjustPoolSize(pool_size)
557 yield deferwait(0.2)
558
559 self.assertNumConnections(pool_size)
560 self.assertNumUniqueConnections(num_nodes)
561
562 assertConnsPerNode(conns_per_node)
563
564 self.flushLoggedErrors()
f77efca @thepaul fill in some tests
thepaul authored
565
566 def test_huge_pool(self):
567 pass
568
69c6910 @thepaul test_manual_node_add
thepaul authored
569 @defer.inlineCallbacks
f77efca @thepaul fill in some tests
thepaul authored
570 def test_manual_node_add(self):
69c6910 @thepaul test_manual_node_add
thepaul authored
571 num_nodes = 3
572 pool_size = 5
573
574 class LyingCassanovaNode(cassanova.CassanovaNode):
575 def endpoint_str(self):
576 return '127.0.0.1:%d' % (self.addr.port + 1000)
577 class LyingFakeCluster(FakeCassandraCluster):
578 node_class = LyingCassanovaNode
579
580 with self.cluster_and_pool(num_nodes=num_nodes, pool_size=1,
581 cluster_class=LyingFakeCluster):
582 yield self.make_standard_cfs()
583 yield self.insert_dumb_rows()
584
585 self.pool.conn_timeout = 0.5
586
587 # turn up pool size once other nodes are known
588 self.pool.adjustPoolSize(pool_size)
589 yield deferwait(0.2)
590
591 # shouldn't have been able to find any nodes besides the seed
592 self.assertNumConnections(pool_size)
593 self.assertNumUniqueConnections(1)
594
595 # add address for a second real node, raise pool size so new
596 # connections are made
597 self.pool.addNode((self.cluster.iface, self.cluster.port + 1))
598 self.pool.adjustPoolSize(pool_size * 2)
599 yield deferwait(0.4)
600
601 self.assertNumConnections(pool_size * 2)
602 self.assertNumUniqueConnections(2)
603
604 self.flushLoggedErrors()
f77efca @thepaul fill in some tests
thepaul authored
605
0212115 @thepaul test_manual_node_remove
thepaul authored
606 @defer.inlineCallbacks
f77efca @thepaul fill in some tests
thepaul authored
607 def test_manual_node_remove(self):
0212115 @thepaul test_manual_node_remove
thepaul authored
608 num_nodes = 5
609 pool_size = 10
610
611 with self.cluster_and_pool(num_nodes=num_nodes, pool_size=1):
612 yield self.make_standard_cfs()
613 yield self.insert_dumb_rows()
614
615 # turn up pool size once other nodes are known
616 self.pool.adjustPoolSize(pool_size)
617 yield deferwait(0.2)
618
619 self.assertNumConnections(pool_size)
620 self.assertNumUniqueConnections(num_nodes)
621
622 n = iter(self.pool.nodes).next()
623 self.pool.removeNode(n)
624 yield deferwait(0.2)
625
626 self.assertNumConnections(pool_size)
627 self.assertNumUniqueConnections(num_nodes - 1)
628
629 # ask for one extra connection, to make sure the removed node
630 # isn't re-added and connected to
631 self.pool.adjustPoolSize(pool_size + 1)
632 yield deferwait(0.1)
633
634 self.assertNumConnections(pool_size + 1)
635 self.assertNumUniqueConnections(num_nodes - 1)
f77efca @thepaul fill in some tests
thepaul authored
636
d76fe9a @thepaul test_conn_loss_during_idle
thepaul authored
637 @defer.inlineCallbacks
dc3b99a @thepaul add stubs for connection-loss tests
thepaul authored
638 def test_conn_loss_during_idle(self):
d76fe9a @thepaul test_conn_loss_during_idle
thepaul authored
639 num_nodes = pool_size = 6
dc3b99a @thepaul add stubs for connection-loss tests
thepaul authored
640
d76fe9a @thepaul test_conn_loss_during_idle
thepaul authored
641 with self.cluster_and_pool(num_nodes=num_nodes, pool_size=1):
642 yield self.make_standard_cfs()
643 yield self.insert_dumb_rows()
644
645 # turn up pool size once other nodes are known
646 self.pool.adjustPoolSize(pool_size)
647 yield deferwait(0.2)
648
649 self.assertNumConnections(pool_size)
650 self.assertNumUniqueConnections(pool_size)
651 self.assertNumWorkers(0)
652
653 self.killSomeConn()
654 yield deferwait(0.1)
655
656 self.assertNumConnections(pool_size)
657 self.assertNumWorkers(0)
658
659 self.killSomeNode()
660 yield deferwait(0.1)
661
662 conns = self.assertNumConnections(pool_size)
6be595a @thepaul test_connection_leveling
thepaul authored
663 uniqnodes = set(n for (n,p) in conns)
664 self.assert_(len(uniqnodes) >= (num_nodes - 1),
d76fe9a @thepaul test_conn_loss_during_idle
thepaul authored
665 msg='Expected %d or more unique connected nodes, but found %d'
6be595a @thepaul test_connection_leveling
thepaul authored
666 % (num_nodes - 1, len(uniqnodes)))
d76fe9a @thepaul test_conn_loss_during_idle
thepaul authored
667 self.assertNumWorkers(0)
668
669 self.flushLoggedErrors()
dc3b99a @thepaul add stubs for connection-loss tests
thepaul authored
670
0efded5 @thepaul test_last_conn_loss_during_idle
thepaul authored
671 @defer.inlineCallbacks
dc3b99a @thepaul add stubs for connection-loss tests
thepaul authored
672 def test_last_conn_loss_during_idle(self):
0efded5 @thepaul test_last_conn_loss_during_idle
thepaul authored
673 with self.cluster_and_pool(pool_size=1, num_nodes=1):
674 yield self.make_standard_cfs()
675 yield self.insert_dumb_rows()
676
677 no_nodes_called = [False]
678 def on_no_nodes(poolsize, targetsize, pendingreqs, expectedwait):
679 self.assertEqual(poolsize, 0)
680 self.assertEqual(targetsize, 1)
681 self.assertEqual(pendingreqs, 0)
682 no_nodes_called[0] = True
683 self.pool.on_insufficient_nodes = on_no_nodes
684
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
685 self.assertNumConnections(1)
686 node = self.killSomeNode()
0efded5 @thepaul test_last_conn_loss_during_idle
thepaul authored
687 yield deferwait(0.05)
688
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
689 self.assert_(no_nodes_called[0], msg='on_no_nodes was not called')
0efded5 @thepaul test_last_conn_loss_during_idle
thepaul authored
690
691 node.startService()
692 d = self.pool.get('key004', 'Standard1', '%s-004-007' % self.ksname,
693 retries=2)
694 addtimeout(d, 3.0)
695 answer = yield d
696 self.assertEqual(answer.column.value, 'val-%s-004-007' % self.ksname)
dc3b99a @thepaul add stubs for connection-loss tests
thepaul authored
697
6be595a @thepaul test_connection_leveling
thepaul authored
698 self.flushLoggedErrors()
699
2e5f961 @thepaul test_last_conn_loss_during_request
thepaul authored
700 @defer.inlineCallbacks
dc3b99a @thepaul add stubs for connection-loss tests
thepaul authored
701 def test_last_conn_loss_during_request(self):
2e5f961 @thepaul test_last_conn_loss_during_request
thepaul authored
702 with self.cluster_and_pool(pool_size=1, num_nodes=1):
703 yield self.make_standard_cfs()
704 yield self.insert_dumb_rows()
705
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
706 self.assertNumConnections(1)
2e5f961 @thepaul test_last_conn_loss_during_request
thepaul authored
707
708 d = self.pool.get('key004', 'Standard1/wait=1.0',
709 '%s-004-008' % self.ksname, retries=4)
710 yield deferwait(0.1)
711
712 def cancel_if_no_conns(numconns, pending):
713 numworkers = self.pool.num_working_conns()
714 if numworkers == 0 and not d.called:
715 d.cancel()
716 self.pool.on_insufficient_conns = cancel_if_no_conns
717
3f8fae7 @thepaul use utility methods from last; more concise tests
thepaul authored
718 self.assertNumWorkers(1)
719 self.killWorkingNode()
2e5f961 @thepaul test_last_conn_loss_during_request
thepaul authored
720 yield deferwait(0.05)
721
722 self.assertFired(d)
723 yield self.assertFailure(d, defer.CancelledError)
724
725 self.flushLoggedErrors()
dc3b99a @thepaul add stubs for connection-loss tests
thepaul authored
726
462fc27 @thepaul add test_main_seed_down
thepaul authored
727 @defer.inlineCallbacks
728 def test_main_seed_down(self):
729 with self.cluster_and_pool(pool_size=1, num_nodes=2):
730 yield self.make_standard_cfs()
731 yield self.insert_dumb_rows(numkeys=20)
732
733 self.pool.adjustPoolSize(5)
734 yield deferwait(0.1)
735 self.assertNumConnections(5)
736 self.assertNumUniqueConnections(2)
737
738 # kill the first seed node
739 startnode = [node for (node, proto) in self.cluster.get_connections()
740 if node.addr.port == self.start_port]
741 startnode[0].stopService()
742
743 # submit a bunch of read requests
744 dlist = []
745 keys = yield self.pool.get_range_slices('Standard1', start='',
746 count=10, column_count=0)
747 for k in keys:
748 d = self.pool.get_range_slices('Standard1', start=k.key, finish=k.key,
749 column_count=10)
750 dlist.append(d)
751
752 yield defer.DeferredList(dlist, fireOnOneErrback=True)
753
754 self.flushLoggedErrors()
755
faeb625 @thepaul test_lots_of_up_and_down
thepaul authored
756 @defer.inlineCallbacks
757 def test_lots_of_up_and_down(self):
758 pool_size = 20
759 num_nodes = 10
760 num_ops = 500
761 num_twiddles = 100
762 runtime = 4.0
763 ksname = 'KS'
764 num_keys = 20
765
766 @defer.inlineCallbacks
767 def node_twiddler(optime, numops):
768 end_time = time() + optime
769 wait_per_op = float(optime) / numops
770 log.msg('twiddler starting')
771 while True:
772 if time() > end_time:
773 break
774 yield deferwait(random.normalvariate(wait_per_op, wait_per_op * 0.2))
775 nodes = self.cluster.get_nodes()
776 running_nodes = [n for n in nodes if n.running]
777 nonrunning = [n for n in nodes if not n.running]
778 if len(running_nodes) <= 1:
779 op = 'up'
780 elif len(nonrunning) == 0:
781 op = 'down'
782 else:
783 op = random.choice(('down', 'up'))
784 if op == 'down':
785 random.choice(running_nodes).stopService()
786 else:
787 random.choice(nonrunning).startService()
788 log.msg('twiddler done')
789
790 @defer.inlineCallbacks
791 def work_o_tron(optime, numops, n):
792 log.msg('work_o_tron %d started' % n)
793 end_time = time() + optime
794 wait_per_op = float(optime) / numops
795 opsdone = 0
796 while True:
797 if time() > end_time:
798 break
799 thiswait = random.normalvariate(wait_per_op, wait_per_op * 0.2)
800 keynum = random.randint(0, num_keys - 1)
801 log.msg('work_o_tron %d getting key%03d, waiting %f' % (n, keynum, thiswait))
802 d = self.pool.get('key%03d' % keynum, 'Standard1/wait=%f' % thiswait,
803 '%s-%03d-001' % (ksname, keynum),
804 retries=10)
805 result = yield d
806 log.msg('work_o_tron %d got %r' % (n, result))
807 self.assertEqual(result.column.value, 'val-%s-%03d-001' % (ksname, keynum))
808 opsdone += 1
809 log.msg('work_o_tron %d done' % n)
810 self.assertApproximates(opsdone, numops, 0.5 * numops)
811
812 starttime = time()
567ef6c @thobbs Compatible with Cassandra 0.7 and 0.8
thobbs authored
813 with self.cluster_and_pool(pool_size=1, num_nodes=num_nodes,
ff57817 @thepaul rearrange new module structure a bit
thepaul authored
814 api_version=translate.CASSANDRA_08_VERSION):
faeb625 @thepaul test_lots_of_up_and_down
thepaul authored
815 yield self.make_standard_cfs(ksname)
816 yield self.insert_dumb_rows(ksname, numkeys=num_keys)
817
818 self.pool.adjustPoolSize(pool_size)
819 yield deferwait(0.5)
820
821 twiddler = node_twiddler(runtime, num_twiddles)
822 workers = [work_o_tron(runtime, num_ops / pool_size, n)
823 for n in range(pool_size)]
824
825 end = yield defer.DeferredList([twiddler] + workers, fireOnOneErrback=True)
826 for num, (succ, result) in enumerate(end):
827 self.assert_(succ, msg='worker %d failed: result: %s' % (num, result))
828 endtime = time()
829
830 self.assertApproximates(endtime - starttime, runtime, 0.5 * runtime)
831 self.flushLoggedErrors()
832
258c2e2 @thepaul don't run CCP tests if Cassanova not available
thepaul authored
833 if cassanova:
834 class EnhancedCassanovaInterface(cassanova.CassanovaInterface):
835 """
836 Add a way to request operations which are guaranteed to take (at least) a
837 given amount of time, for easier testing of things which might take a long
838 time in the real world
839 """
840
841 def get(self, key, column_path, consistency_level):
842 args = []
843 if '/' in column_path.column_family:
844 parts = column_path.column_family.split('/')
845 column_path.column_family = parts[0]
846 args = parts[1:]
847 d = defer.maybeDeferred(cassanova.CassanovaInterface.get, self, key,
848 column_path, consistency_level)
849 waittime = 0
850 for arg in args:
851 if arg.startswith('wait='):
852 waittime += float(arg[5:])
853 if waittime > 0:
854 def doWait(x):
855 waiter = deferwait(waittime, x)
856 self.service.waiters.append(waiter)
857 return waiter
858 d.addCallback(doWait)
859 return d
860
861 class EnhancedCassanovaFactory(cassanova.CassanovaFactory):
862 handler_factory = EnhancedCassanovaInterface
863
864 class EnhancedCassanovaNode(cassanova.CassanovaNode):
865 factory = EnhancedCassanovaFactory
866
867 def endpoint_str(self):
868 return '%s:%d' % (self.addr.host, self.addr.port)
869
870 class FakeCassandraCluster(cassanova.CassanovaService):
871 """
872 Tweak the standard Cassanova service to allow nodes to run on the same
873 interface, but different ports. CassandraClusterPool already knows how
874 to understand the 'host:port' type of endpoint description in
875 describe_ring output.
876 """
877
878 node_class = EnhancedCassanovaNode
879
880 def __init__(self, num_nodes, start_port=41356, interface='127.0.0.1'):
881 cassanova.CassanovaService.__init__(self, start_port)
882 self.waiters = []
883 self.iface = interface
884 for n in range(num_nodes):
885 self.add_node_on_port(start_port + n)
886 # make a non-system keyspace so that describe_ring can work
887 self.keyspaces['dummy'] = cassanova.KsDef(
888 'dummy',
889 replication_factor=1,
890 strategy_class='org.apache.cassandra.locator.SimpleStrategy',
891 cf_defs=[]
892 )
893
894 def add_node_on_port(self, port, token=None):
895 node = self.node_class(port, self.iface, token=token)
896 node.setServiceParent(self)
897 self.ring[node.mytoken] = node
898
899 def stopService(self):
900 cassanova.CassanovaService.stopService(self)
901 for d in self.waiters:
902 if not d.called:
903 d.cancel()
904 d.addErrback(lambda n: None)
aa6db8f @thepaul update tests to use cassanova
thepaul authored
905
258c2e2 @thepaul don't run CCP tests if Cassanova not available
thepaul authored
906 if not cassanova:
907 del CassandraClusterPoolTest
Something went wrong with that request. Please try again.