Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch '1.0'

  • Loading branch information...
commit ff21c6f66896de2f520f6116421a7a8961eae850 2 parents 06199f8 + 6ac2289
@driftx authored
View
12 debian/changelog
@@ -1,3 +1,15 @@
+python-telephus (1.0.0~beta1) unstable; urgency=low
+
+ * paul cannon <paul@datastax.com>:
+ + provide require_api_version kwarg, not api_version
+ + rip out CQL support
+ + fix setting consistency on CCP instances
+ + allow deb to work with python2.7
+ * Nick Bailey <nickmbailey@gmail.com>:
+ + Update thrift definitions for optional rpc_endpoints attribute
+
+ -- paul cannon <paul@datastax.com> Sat, 07 Jan 2012 23:27:12 -0600
+
python-telephus (0.8.0~beta1) unstable; urgency=low
* Tyler Hobbs <tyler@datastax.com>:
View
7 example.py
@@ -1,8 +1,7 @@
#!/usr/bin/python
from telephus.protocol import ManagedCassandraClientFactory
from telephus.client import CassandraClient
-from telephus.cassandra.ttypes import (ColumnPath, ColumnParent, Column,
- SuperColumn, Compression)
+from telephus.cassandra.ttypes import ColumnPath, ColumnParent, Column, SuperColumn
from twisted.internet import defer
HOST = 'localhost'
@@ -48,10 +47,6 @@ def dostuff(client):
column='testcounter', super_column='testsuper')
print 'get super counter value', res
- res = yield client.execute_cql_query("SELECT * FROM %s WHERE KEY = %s" % (CF, 'test'.encode('hex')),
- compression=Compression.NONE)
- print 'execute_cql_query', res
-
# batch insert will figure out if you're trying a CF or SCF
# from the data structure
res = yield client.batch_insert(key='test', column_family=CF, mapping={colname: 'bar'})
View
2  telephus/cassandra/c08/constants.py
@@ -1 +1 @@
-VERSION = "19.10.0"
+VERSION = "19.16.0"
View
135 telephus/cassandra/c08/ttypes.py
@@ -2026,10 +2026,18 @@ def __ne__(self, other):
class TokenRange:
"""
+ A TokenRange describes part of the Cassandra ring, it is a mapping from a range to
+ endpoints responsible for that range.
+ @param start_token The first token in the range
+ @param end_token The last token in the range
+ @param endpoints The endpoints responsible for the range (listed by their configured listen_address)
+ @param rpc_endpoints The endpoints responsible for the range (listed by their configured rpc_address)
+
Attributes:
- start_token
- end_token
- endpoints
+ - rpc_endpoints
"""
thrift_spec = (
@@ -2037,12 +2045,14 @@ class TokenRange:
(1, TType.STRING, 'start_token', None, None, ), # 1
(2, TType.STRING, 'end_token', None, None, ), # 2
(3, TType.LIST, 'endpoints', (TType.STRING,None), None, ), # 3
+ (4, TType.LIST, 'rpc_endpoints', (TType.STRING,None), None, ), # 4
)
- def __init__(self, start_token=None, end_token=None, endpoints=None,):
+ def __init__(self, start_token=None, end_token=None, endpoints=None, rpc_endpoints=None,):
self.start_token = start_token
self.end_token = end_token
self.endpoints = endpoints
+ self.rpc_endpoints = rpc_endpoints
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -2073,6 +2083,16 @@ def read(self, iprot):
iprot.readListEnd()
else:
iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.LIST:
+ self.rpc_endpoints = []
+ (_etype44, _size41) = iprot.readListBegin()
+ for _i45 in xrange(_size41):
+ _elem46 = iprot.readString();
+ self.rpc_endpoints.append(_elem46)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -2094,8 +2114,15 @@ def write(self, oprot):
if self.endpoints != None:
oprot.writeFieldBegin('endpoints', TType.LIST, 3)
oprot.writeListBegin(TType.STRING, len(self.endpoints))
- for iter41 in self.endpoints:
- oprot.writeString(iter41)
+ for iter47 in self.endpoints:
+ oprot.writeString(iter47)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ if self.rpc_endpoints != None:
+ oprot.writeFieldBegin('rpc_endpoints', TType.LIST, 4)
+ oprot.writeListBegin(TType.STRING, len(self.rpc_endpoints))
+ for iter48 in self.rpc_endpoints:
+ oprot.writeString(iter48)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -2149,11 +2176,11 @@ def read(self, iprot):
if fid == 1:
if ftype == TType.MAP:
self.credentials = {}
- (_ktype43, _vtype44, _size42 ) = iprot.readMapBegin()
- for _i46 in xrange(_size42):
- _key47 = iprot.readString();
- _val48 = iprot.readString();
- self.credentials[_key47] = _val48
+ (_ktype50, _vtype51, _size49 ) = iprot.readMapBegin()
+ for _i53 in xrange(_size49):
+ _key54 = iprot.readString();
+ _val55 = iprot.readString();
+ self.credentials[_key54] = _val55
iprot.readMapEnd()
else:
iprot.skip(ftype)
@@ -2170,9 +2197,9 @@ def write(self, oprot):
if self.credentials != None:
oprot.writeFieldBegin('credentials', TType.MAP, 1)
oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.credentials))
- for kiter49,viter50 in self.credentials.items():
- oprot.writeString(kiter49)
- oprot.writeString(viter50)
+ for kiter56,viter57 in self.credentials.items():
+ oprot.writeString(kiter56)
+ oprot.writeString(viter57)
oprot.writeMapEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -2439,11 +2466,11 @@ def read(self, iprot):
elif fid == 13:
if ftype == TType.LIST:
self.column_metadata = []
- (_etype54, _size51) = iprot.readListBegin()
- for _i55 in xrange(_size51):
- _elem56 = ColumnDef()
- _elem56.read(iprot)
- self.column_metadata.append(_elem56)
+ (_etype61, _size58) = iprot.readListBegin()
+ for _i62 in xrange(_size58):
+ _elem63 = ColumnDef()
+ _elem63.read(iprot)
+ self.column_metadata.append(_elem63)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -2571,8 +2598,8 @@ def write(self, oprot):
if self.column_metadata != None:
oprot.writeFieldBegin('column_metadata', TType.LIST, 13)
oprot.writeListBegin(TType.STRUCT, len(self.column_metadata))
- for iter57 in self.column_metadata:
- iter57.write(oprot)
+ for iter64 in self.column_metadata:
+ iter64.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.gc_grace_seconds != None:
@@ -2664,6 +2691,7 @@ class KsDef:
- strategy_options
- replication_factor: @deprecated
- cf_defs
+ - durable_writes
"""
thrift_spec = (
@@ -2673,14 +2701,16 @@ class KsDef:
(3, TType.MAP, 'strategy_options', (TType.STRING,None,TType.STRING,None), None, ), # 3
(4, TType.I32, 'replication_factor', None, None, ), # 4
(5, TType.LIST, 'cf_defs', (TType.STRUCT,(CfDef, CfDef.thrift_spec)), None, ), # 5
+ (6, TType.BOOL, 'durable_writes', None, True, ), # 6
)
- def __init__(self, name=None, strategy_class=None, strategy_options=None, replication_factor=None, cf_defs=None,):
+ def __init__(self, name=None, strategy_class=None, strategy_options=None, replication_factor=None, cf_defs=None, durable_writes=thrift_spec[6][4],):
self.name = name
self.strategy_class = strategy_class
self.strategy_options = strategy_options
self.replication_factor = replication_factor
self.cf_defs = cf_defs
+ self.durable_writes = durable_writes
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -2704,11 +2734,11 @@ def read(self, iprot):
elif fid == 3:
if ftype == TType.MAP:
self.strategy_options = {}
- (_ktype59, _vtype60, _size58 ) = iprot.readMapBegin()
- for _i62 in xrange(_size58):
- _key63 = iprot.readString();
- _val64 = iprot.readString();
- self.strategy_options[_key63] = _val64
+ (_ktype66, _vtype67, _size65 ) = iprot.readMapBegin()
+ for _i69 in xrange(_size65):
+ _key70 = iprot.readString();
+ _val71 = iprot.readString();
+ self.strategy_options[_key70] = _val71
iprot.readMapEnd()
else:
iprot.skip(ftype)
@@ -2720,14 +2750,19 @@ def read(self, iprot):
elif fid == 5:
if ftype == TType.LIST:
self.cf_defs = []
- (_etype68, _size65) = iprot.readListBegin()
- for _i69 in xrange(_size65):
- _elem70 = CfDef()
- _elem70.read(iprot)
- self.cf_defs.append(_elem70)
+ (_etype75, _size72) = iprot.readListBegin()
+ for _i76 in xrange(_size72):
+ _elem77 = CfDef()
+ _elem77.read(iprot)
+ self.cf_defs.append(_elem77)
iprot.readListEnd()
else:
iprot.skip(ftype)
+ elif fid == 6:
+ if ftype == TType.BOOL:
+ self.durable_writes = iprot.readBool();
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -2749,9 +2784,9 @@ def write(self, oprot):
if self.strategy_options != None:
oprot.writeFieldBegin('strategy_options', TType.MAP, 3)
oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.strategy_options))
- for kiter71,viter72 in self.strategy_options.items():
- oprot.writeString(kiter71)
- oprot.writeString(viter72)
+ for kiter78,viter79 in self.strategy_options.items():
+ oprot.writeString(kiter78)
+ oprot.writeString(viter79)
oprot.writeMapEnd()
oprot.writeFieldEnd()
if self.replication_factor != None:
@@ -2761,10 +2796,14 @@ def write(self, oprot):
if self.cf_defs != None:
oprot.writeFieldBegin('cf_defs', TType.LIST, 5)
oprot.writeListBegin(TType.STRUCT, len(self.cf_defs))
- for iter73 in self.cf_defs:
- iter73.write(oprot)
+ for iter80 in self.cf_defs:
+ iter80.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
+ if self.durable_writes != None:
+ oprot.writeFieldBegin('durable_writes', TType.BOOL, 6)
+ oprot.writeBool(self.durable_writes)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
def validate(self):
@@ -2824,11 +2863,11 @@ def read(self, iprot):
elif fid == 2:
if ftype == TType.LIST:
self.columns = []
- (_etype77, _size74) = iprot.readListBegin()
- for _i78 in xrange(_size74):
- _elem79 = Column()
- _elem79.read(iprot)
- self.columns.append(_elem79)
+ (_etype84, _size81) = iprot.readListBegin()
+ for _i85 in xrange(_size81):
+ _elem86 = Column()
+ _elem86.read(iprot)
+ self.columns.append(_elem86)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -2849,8 +2888,8 @@ def write(self, oprot):
if self.columns != None:
oprot.writeFieldBegin('columns', TType.LIST, 2)
oprot.writeListBegin(TType.STRUCT, len(self.columns))
- for iter80 in self.columns:
- iter80.write(oprot)
+ for iter87 in self.columns:
+ iter87.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -2911,11 +2950,11 @@ def read(self, iprot):
elif fid == 2:
if ftype == TType.LIST:
self.rows = []
- (_etype84, _size81) = iprot.readListBegin()
- for _i85 in xrange(_size81):
- _elem86 = CqlRow()
- _elem86.read(iprot)
- self.rows.append(_elem86)
+ (_etype91, _size88) = iprot.readListBegin()
+ for _i92 in xrange(_size88):
+ _elem93 = CqlRow()
+ _elem93.read(iprot)
+ self.rows.append(_elem93)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -2941,8 +2980,8 @@ def write(self, oprot):
if self.rows != None:
oprot.writeFieldBegin('rows', TType.LIST, 2)
oprot.writeListBegin(TType.STRUCT, len(self.rows))
- for iter87 in self.rows:
- iter87.write(oprot)
+ for iter94 in self.rows:
+ iter94.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.num != None:
View
5 telephus/client.py
@@ -226,11 +226,6 @@ def batch_mutate(self, mutationmap=None, timestamp=None, consistency=None, retri
req = ManagedThriftRequest('batch_mutate', mutmap, consistency)
return self.manager.pushRequest(req, retries=retries)
- @requirekwargs('query')
- def execute_cql_query(self, query=None, compression=Compression.GZIP, retries=None):
- req = ManagedThriftRequest('execute_cql_query', query, compression)
- return self.manager.pushRequest(req, retries=retries)
-
def _mk_cols_or_supers(self, mapping, timestamp, ttl=None, make_deletions=False):
if isinstance(mapping, list):
return mapping
View
40 telephus/pool.py
@@ -59,7 +59,7 @@
from telephus.cassandra.c08 import Cassandra as Cassandra08
from telephus.cassandra.ttypes import *
from telephus.client import CassandraClient
-from telephus.translate import (getAPIVersion, translateArgs,
+from telephus.translate import (thrift_api_ver_to_cassandra_ver, translateArgs,
postProcess)
noop = lambda *a, **kw: None
@@ -214,13 +214,16 @@ def prep_connection(self, creds=None, keyspace=None):
errbacked if something goes wrong.
"""
- d = defer.succeed(0)
- if self.api_version is None:
- d.addCallback(lambda _: self.my_describe_version())
- d.addCallback(getAPIVersion)
- def set_version(ver):
- self.version = ver
- d.addCallback(set_version)
+ d = self.my_describe_version()
+ def check_version(thrift_ver):
+ cassver = thrift_api_ver_to_cassandra_ver(thrift_ver)
+ if self.api_version is None:
+ self.api_version = cassver
+ elif self.api_version != cassver:
+ raise APIMismatch("%s is exposing thrift protocol version %s -> "
+ "Cassandra version %s, but %s was expected"
+ % (self.node, thrift_ver, cassver, self.api_version))
+ d.addCallback(check_version)
if creds is not None:
d.addCallback(lambda _: self.my_login(creds))
if keyspace is not None:
@@ -519,7 +522,7 @@ def __cmp__(self, other):
def __hash__(self):
return hash((self.__class__, self.host, self.port))
-class CassandraClusterPool(service.Service):
+class CassandraClusterPool(service.Service, object):
"""
Manage a pool of connections to nodes in a Cassandra cluster.
@@ -594,7 +597,7 @@ class CassandraClusterPool(service.Service):
def __init__(self, seed_list, keyspace=None, creds=None, thrift_port=None,
pool_size=None, conn_timeout=10, bind_address=None,
- log_cb=log.msg, reactor=None, api_version=None):
+ log_cb=log.msg, reactor=None, require_api_version=None):
"""
Initialize a CassandraClusterPool.
@@ -638,10 +641,15 @@ def __init__(self, seed_list, keyspace=None, creds=None, thrift_port=None,
@param reactor: The reactor instance to use when starting thrift
connections or setting timers.
- @param api_version: Whether the thrift API version number should be
- checked against the one Telephus understands upon originating any
- connection. If the versions are not compatible, the connection to
- that node will be aborted. Default: no
+ @param require_api_version: If not None, Telephus will require that
+ all connections conform to the API for the given Cassandra version.
+ Possible values are "0.7", "0.8", "1.0", etc.
+
+ If None, Telephus will consider all supported API versions to be
+ acceptable.
+
+ If the api version reported by a remote node is not compatible, the
+ connection to that node will be aborted. Default: None
"""
self.seed_list = list(seed_list)
@@ -657,7 +665,7 @@ def __init__(self, seed_list, keyspace=None, creds=None, thrift_port=None,
self.keyspace = keyspace
self.creds = creds
self.request_queue = defer.DeferredQueue()
- self.api_version = None
+ self.require_api_version = require_api_version
self.future_fill_pool = None
self.removed_nodes = set()
self._client_instance = CassandraClient(self)
@@ -940,7 +948,7 @@ def schedule_future_fill_pool(self, seconds):
def make_conn(self, node):
self.log('Adding connection to %s' % (node,))
- f = self.conn_factory(node, self, self.api_version)
+ f = self.conn_factory(node, self, self.require_api_version)
bindaddr=self.bind_address
if bindaddr is not None and isinstance(bindaddr, str):
bindaddr = (bindaddr, 0)
View
25 telephus/protocol.py
@@ -40,13 +40,16 @@ def connectionMade(self):
.addCallbacks(self.setupComplete, self.setupFailed)
def setupConnection(self):
- d = defer.succeed(True)
- if self.api_version == None:
- d.addCallback(lambda _: self.client.describe_version())
- d.addCallback(translate.getAPIVersion)
- def set_version(ver):
- self.api_version = ver
- d.addCallback(set_version)
+ d = self.client.describe_version()
+ def check_version(thrift_ver):
+ cver = translate.thrift_api_ver_to_cassandra_ver(thrift_ver)
+ if self.api_version is None:
+ self.api_version = cver
+ elif self.api_version != cver:
+ raise APIMismatch("%s is exposing thrift protocol version %s -> "
+ "Cassandra version %s, but %s was expected"
+ % (self.transport.getPeer(), thrift_ver, cver, self.api_version))
+ d.addCallback(check_version)
if self.keyspace:
d.addCallback(lambda _: self.client.set_keyspace(self.keyspace))
return d
@@ -105,7 +108,7 @@ class ManagedCassandraClientFactory(ReconnectingClientFactory):
thriftFactory = TBinaryProtocol.TBinaryProtocolAcceleratedFactory
protocol = ManagedThriftClientProtocol
- def __init__(self, keyspace=None, retries=0, credentials={}, api_version=None):
+ def __init__(self, keyspace=None, retries=0, credentials={}, require_api_version=None):
self.deferred = defer.Deferred()
self.queue = defer.DeferredQueue()
self.continueTrying = True
@@ -116,7 +119,7 @@ def __init__(self, keyspace=None, retries=0, credentials={}, api_version=None):
self.credentials = credentials
if credentials:
self.protocol = AuthenticatedThriftClientProtocol
- self.api_version = api_version
+ self.require_api_version = require_api_version
def _errback(self, reason=None):
if self.deferred:
@@ -146,11 +149,11 @@ def buildProtocol(self, addr):
p = self.protocol(self.keyspace,
self.credentials,
self.thriftFactory(),
- api_version=self.api_version)
+ api_version=self.require_api_version)
else:
p = self.protocol(self.thriftFactory(),
keyspace=self.keyspace,
- api_version=self.api_version)
+ api_version=self.require_api_version)
p.factory = self
return p
View
2  telephus/translate.py
@@ -9,7 +9,7 @@
class APIMismatch(Exception):
pass
-def getAPIVersion(remoteversion):
+def thrift_api_ver_to_cassandra_ver(remoteversion):
"""
Try to determine if the remote thrift api version is likely to work the
way we expect it to. A mismatch in major version number will definitely
View
67 test/test_cassandraclient.py
@@ -5,9 +5,8 @@
from telephus.client import CassandraClient
from telephus import translate
from telephus.cassandra.ttypes import *
-from telephus.translate import getAPIVersion, CASSANDRA_08_VERSION
+from telephus.translate import thrift_api_ver_to_cassandra_ver, CASSANDRA_08_VERSION
import os
-import zlib
CONNS = 5
@@ -39,7 +38,7 @@ def setUp(self):
yield self.cmanager.deferred
remote_ver = yield self.client.describe_version()
- self.version = getAPIVersion(remote_ver)
+ self.version = thrift_api_ver_to_cassandra_ver(remote_ver)
self.my_keyspace = KsDef(
name=KEYSPACE,
@@ -247,24 +246,6 @@ def test_counter_remove(self):
column='col', super_column='scol'),
NotFoundException)
- @defer.inlineCallbacks
- def test_cql(self):
- if self.version != CASSANDRA_08_VERSION:
- raise unittest.SkipTest('CQL is not supported in 0.7')
-
- yield self.client.insert('test', CF, 'testval', column='col1')
- res = yield self.client.get('test', CF, column='col1')
- self.assertEquals(res.column.value, 'testval')
-
- query = 'SELECT * from %s where KEY = %s' % (CF, 'test'.encode('hex'))
- uncompressed_result = yield self.client.execute_cql_query(query, Compression.NONE)
- self.assertEquals(uncompressed_result.rows[0].columns[0].name, 'col1')
- self.assertEquals(uncompressed_result.rows[0].columns[0].value, 'testval')
-
- compressed_query = zlib.compress(query)
- compressed_result = yield self.client.execute_cql_query(compressed_query, Compression.GZIP)
- self.assertEquals(uncompressed_result, compressed_result)
-
def sleep(self, secs):
d = defer.Deferred()
reactor.callLater(secs, d.callback, None)
@@ -362,6 +343,8 @@ def test_column_family_manipulation(self):
)
post_07_fields = ['replicate_on_write', 'merge_shards_chance',
'key_validation_class', 'row_cache_provider', 'key_alias']
+ post_08_fields = ['memtable_throughput_in_mb', 'memtable_flush_after_mins', 'memtable_operations_in_millions']
+
yield self.client.system_add_column_family(cfdef)
ksdef = yield self.client.describe_keyspace(KEYSPACE)
@@ -372,6 +355,11 @@ def test_column_family_manipulation(self):
setattr(cfdef, field, None)
setattr(cfdef2, field, None)
+ for field in post_08_fields:
+ # These fields change from 0.8 to 1.0
+ setattr(cfdef, field, None)
+ setattr(cfdef2, field, None)
+
# we don't know the id ahead of time. copy the new one so the equality
# comparison won't fail
cfdef.id = cfdef2.id
@@ -441,18 +429,35 @@ def test_initial_connection_failure(self):
cmanager = ManagedCassandraClientFactory()
client = CassandraClient(cmanager)
d = cmanager.deferred
- reactor.connectTCP('nonexistent-host.000-', PORT, cmanager)
+ reactor.connectTCP('nonexistent.example.com', PORT, cmanager)
yield self.assertFailure(d, error.DNSLookupError)
cmanager.shutdown()
@defer.inlineCallbacks
def test_api_match(self):
- for version in [translate.CASSANDRA_07_VERSION, translate.CASSANDRA_08_VERSION, None]:
- cmanager = ManagedCassandraClientFactory(api_version=version)
- client = CassandraClient(cmanager)
- d = cmanager.deferred
- conn = reactor.connectTCP(HOST, PORT, cmanager)
- yield d
- # do something innocuous, make sure connection is good
- yield client.describe_schema_versions()
- yield cmanager.shutdown()
+ cmanager = ManagedCassandraClientFactory(require_api_version=None)
+ client = CassandraClient(cmanager)
+ d = cmanager.deferred
+ conn = reactor.connectTCP(HOST, PORT, cmanager)
+ yield d
+ yield client.describe_schema_versions()
+ api_ver = cmanager._protos[0].api_version
+ yield cmanager.shutdown()
+
+ # try with the right version explicitly required
+ cmanager = ManagedCassandraClientFactory(require_api_version=api_ver)
+ client = CassandraClient(cmanager)
+ d = cmanager.deferred
+ conn = reactor.connectTCP(HOST, PORT, cmanager)
+ yield d
+ yield client.describe_schema_versions()
+ yield cmanager.shutdown()
+
+ # try with a mismatching version explicitly required
+ bad_ver = [v for (_, v) in translate.supported_versions if v != api_ver][0]
+ cmanager = ManagedCassandraClientFactory(require_api_version=bad_ver)
+ client = CassandraClient(cmanager)
+ d = cmanager.deferred
+ conn = reactor.connectTCP(HOST, PORT, cmanager)
+ yield self.assertFailure(d, translate.APIMismatch)
+ yield cmanager.shutdown()
Please sign in to comment.
Something went wrong with that request. Please try again.