From df1a457304c3199de6c5d7588804917806406c9d Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Fri, 20 Sep 2013 22:19:47 +0200 Subject: [PATCH 1/5] Allow user to specify values for blob columns using bytearray. This way the client also work with Cassandra 2.0. --- cassandra/decoder.py | 7 ++++++ tests/integration/test_types.py | 43 +++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/cassandra/decoder.py b/cassandra/decoder.py index eb807ab09a..39de2ca563 100644 --- a/cassandra/decoder.py +++ b/cassandra/decoder.py @@ -783,6 +783,12 @@ def cql_encode_str(val): return cql_quote(val) +def cql_encode_bytes(val): + hex_val = ''.join('%02x' % byte for byte in val) + hex_val = '0x' + hex_val + return hex_val + + def cql_encode_object(val): return str(val) @@ -815,6 +821,7 @@ def cql_encode_set_collection(val): cql_encoders = { float: cql_encode_object, + bytearray: cql_encode_bytes, str: cql_encode_str, unicode: cql_encode_unicode, types.NoneType: cql_encode_none, diff --git a/tests/integration/test_types.py b/tests/integration/test_types.py index 9da3057931..5c8cf0fa90 100644 --- a/tests/integration/test_types.py +++ b/tests/integration/test_types.py @@ -9,9 +9,52 @@ from blist import sortedset +from cassandra import InvalidRequest from cassandra.cluster import Cluster class TypeTests(unittest.TestCase): + def test_blob_type(self): + c = Cluster() + s = c.connect() + s.execute(""" + CREATE KEYSPACE IF NOT EXISTS typetests_blob + WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor': '1'} + """) + s.set_keyspace("typetests_blob") + s.execute(""" + CREATE TABLE IF NOT EXISTS mytable ( + a ascii, + b blob, + PRIMARY KEY (a) + ) + """) + + params = [ + 'key1', + 'blob' + ] + + # Invalid type, blob can't be specified as a string + query = 'INSERT INTO mytable (a, b) VALUES (%s, %s)' + msg = r'.*Invalid STRING constant \(blob\) for b of type blob.*' + self.assertRaisesRegexp(InvalidRequest, msg, s.execute, query, params) + + # Valid types + params = [ + 'key2', + bytearray('blob1', 'hex') + ] + s.execute(query, params) + + expected_vals = [ + 'key2', + bytearray('blob1', 'hex') + ] + + results = s.execute("SELECT * FROM mytable") + + for expected, actual in zip(expected_vals, results[0]): + self.assertEquals(expected, actual) def test_basic_types(self): c = Cluster() From 1ae35132e82a0bcbd712749e5b5cd83bf030e8e6 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Fri, 20 Sep 2013 22:28:30 +0200 Subject: [PATCH 2/5] Allow user to pass 'cql_version' attribute to Cluster constructor. --- cassandra/cluster.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 10fd1c4bbb..4d21c2fe34 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -201,6 +201,7 @@ def __init__(self, metrics_enabled=False, connection_class=None, sockopts=None, + cql_version=None, executor_threads=2, max_schema_agreement_wait=10): """ @@ -236,6 +237,7 @@ def __init__(self, self.metrics_enabled = metrics_enabled self.sockopts = sockopts + self.cql_version = cql_version self.max_schema_agreement_wait = max_schema_agreement_wait # let Session objects be GC'ed (and shutdown) when the user no longer @@ -316,6 +318,7 @@ def connection_factory(self, address, *args, **kwargs): kwargs['port'] = self.port kwargs['compression'] = self.compression kwargs['sockopts'] = self.sockopts + kwargs['cql_version'] = self.cql_version return self.connection_class.factory(address, *args, **kwargs) @@ -326,6 +329,7 @@ def _make_connection_factory(self, host, *args, **kwargs): kwargs['port'] = self.port kwargs['compression'] = self.compression kwargs['sockopts'] = self.sockopts + kwargs['cql_version'] = self.cql_version return partial(self.connection_class.factory, host.address, *args, **kwargs) From 60dd6339600bd32066f43b25496c31e6d900ece9 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Fri, 20 Sep 2013 22:59:47 +0200 Subject: [PATCH 3/5] Add tests for new blob writing functionality. --- tests/integration/test_types.py | 93 +++++++++++++++++++++++++-------- 1 file changed, 72 insertions(+), 21 deletions(-) diff --git a/tests/integration/test_types.py b/tests/integration/test_types.py index 5c8cf0fa90..5a1b0938bf 100644 --- a/tests/integration/test_types.py +++ b/tests/integration/test_types.py @@ -10,19 +10,41 @@ from blist import sortedset from cassandra import InvalidRequest -from cassandra.cluster import Cluster +from cassandra.cluster import Cluster, NoHostAvailable + +CQL_TO_VERSION_MAP = { + '3.1.0': '2.0', + '3.0.5': '1.2' +} class TypeTests(unittest.TestCase): - def test_blob_type(self): + def setUp(self): + super(TypeTests, self).setUp() + + # TODO: There must be a nicer way to do this + for cql_version, cass_version in CQL_TO_VERSION_MAP.items(): + c = Cluster(cql_version=cql_version) + + try: + s = c.connect() + except NoHostAvailable, e: + message = e.errors['127.0.0.1'].message + if not 'is not supported by' in message: + raise e + else: + self._cass_version = cass_version + + def test_blob_type_as_string(self): c = Cluster() s = c.connect() + s.execute(""" - CREATE KEYSPACE IF NOT EXISTS typetests_blob + CREATE KEYSPACE typetests_blob1 WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor': '1'} """) - s.set_keyspace("typetests_blob") + s.set_keyspace("typetests_blob1") s.execute(""" - CREATE TABLE IF NOT EXISTS mytable ( + CREATE TABLE mytable ( a ascii, b blob, PRIMARY KEY (a) @@ -31,23 +53,56 @@ def test_blob_type(self): params = [ 'key1', - 'blob' + 'blobyblob'.encode('hex') ] - # Invalid type, blob can't be specified as a string query = 'INSERT INTO mytable (a, b) VALUES (%s, %s)' - msg = r'.*Invalid STRING constant \(blob\) for b of type blob.*' - self.assertRaisesRegexp(InvalidRequest, msg, s.execute, query, params) - # Valid types + if self._cass_version == '2.0': + # Blob values can't be specified using string notation in CQL 3.1.0 and + # above which is used by default in Cassandra 2.0. + msg = r'.*Invalid STRING constant \(.*?\) for b of type blob.*' + self.assertRaisesRegexp(InvalidRequest, msg, s.execute, query, params) + return + + s.execute(query, params) + expected_vals = [ + 'key1', + 'blobyblob' + ] + + results = s.execute("SELECT * FROM mytable") + + for expected, actual in zip(expected_vals, results[0]): + self.assertEquals(expected, actual) + + def test_blob_type_as_bytearray(self): + c = Cluster() + s = c.connect() + + s.execute(""" + CREATE KEYSPACE typetests_blob2 + WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor': '1'} + """) + s.set_keyspace("typetests_blob2") + s.execute(""" + CREATE TABLE mytable ( + a ascii, + b blob, + PRIMARY KEY (a) + ) + """) + params = [ - 'key2', + 'key1', bytearray('blob1', 'hex') ] + + query = 'INSERT INTO mytable (a, b) VALUES (%s, %s);' s.execute(query, params) expected_vals = [ - 'key2', + 'key1', bytearray('blob1', 'hex') ] @@ -70,7 +125,6 @@ def test_basic_types(self): b text, c ascii, d bigint, - e blob, f boolean, g decimal, h double, @@ -98,7 +152,6 @@ def test_basic_types(self): "sometext", "ascii", # ascii 12345678923456789, # bigint - "blob".encode('hex'), # blob True, # boolean Decimal('1.234567890123456789'), # decimal 0.000244140625, # double @@ -120,7 +173,6 @@ def test_basic_types(self): "sometext", "ascii", # ascii 12345678923456789, # bigint - "blob", # blob True, # boolean Decimal('1.234567890123456789'), # decimal 0.000244140625, # double @@ -138,8 +190,8 @@ def test_basic_types(self): ) s.execute(""" - INSERT INTO mytable (a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p, q, r, s) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + INSERT INTO mytable (a, b, c, d, f, g, h, i, j, k, l, m, n, o, p, q, r, s) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, params) results = s.execute("SELECT * FROM mytable") @@ -149,11 +201,10 @@ def test_basic_types(self): # try the same thing with a prepared statement prepared = s.prepare(""" - INSERT INTO mytable (a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p, q, r, s) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + INSERT INTO mytable (a, b, c, d, f, g, h, i, j, k, l, m, n, o, p, q, r, s) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """) - params[4] = 'blob' s.execute(prepared.bind(params)) results = s.execute("SELECT * FROM mytable") @@ -163,7 +214,7 @@ def test_basic_types(self): # query with prepared statement prepared = s.prepare(""" - SELECT a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p, q, r, s FROM mytable + SELECT a, b, c, d, f, g, h, i, j, k, l, m, n, o, p, q, r, s FROM mytable """) results = s.execute(prepared.bind(())) From f1f329d0b6541b9a9d357c26dfa397e20e4b2a76 Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Mon, 23 Sep 2013 15:55:37 +0200 Subject: [PATCH 4/5] Use a better way of detecting Cassandra and CQL version. Instead of using cql_version Cluster constructor argument query the "system.local" table. --- tests/integration/test_types.py | 31 ++++++++++++++----------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/tests/integration/test_types.py b/tests/integration/test_types.py index 5a1b0938bf..8c435e6f52 100644 --- a/tests/integration/test_types.py +++ b/tests/integration/test_types.py @@ -12,27 +12,18 @@ from cassandra import InvalidRequest from cassandra.cluster import Cluster, NoHostAvailable -CQL_TO_VERSION_MAP = { - '3.1.0': '2.0', - '3.0.5': '1.2' -} class TypeTests(unittest.TestCase): def setUp(self): super(TypeTests, self).setUp() - # TODO: There must be a nicer way to do this - for cql_version, cass_version in CQL_TO_VERSION_MAP.items(): - c = Cluster(cql_version=cql_version) - - try: - s = c.connect() - except NoHostAvailable, e: - message = e.errors['127.0.0.1'].message - if not 'is not supported by' in message: - raise e - else: - self._cass_version = cass_version + # Detect CQL and Cassandra version + c = Cluster() + s = c.connect() + s.set_keyspace('system') + row = s.execute('SELECT cql_version, release_version FROM local')[0] + self._cql_version = self._get_version_as_tuple(row.cql_version) + self._cass_version = self._get_version_as_tuple(row.release_version) def test_blob_type_as_string(self): c = Cluster() @@ -58,7 +49,7 @@ def test_blob_type_as_string(self): query = 'INSERT INTO mytable (a, b) VALUES (%s, %s)' - if self._cass_version == '2.0': + if self._cql_version >= (3, 1, 0): # Blob values can't be specified using string notation in CQL 3.1.0 and # above which is used by default in Cassandra 2.0. msg = r'.*Invalid STRING constant \(.*?\) for b of type blob.*' @@ -227,3 +218,9 @@ def test_basic_types(self): for expected, actual in zip(expected_vals, results[0]): self.assertEquals(expected, actual) + + def _get_version_as_tuple(self, version): + version = version.split('.') + version = [int(p) for p in version] + version = tuple(version) + return version From 9627be73868250f2efd21344e4167fab854a3dcb Mon Sep 17 00:00:00 2001 From: Tomaz Muraus Date: Mon, 23 Sep 2013 16:03:11 +0200 Subject: [PATCH 5/5] Split version detection functionality into a BaseTestCase class. This seems like a useful functionality so make it more easier to re-use this information. --- tests/integration/__init__.py | 25 +++++++++++++++++++++++++ tests/integration/test_types.py | 15 +++++---------- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index a71c59779a..be7ab3278c 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -24,6 +24,31 @@ os.mkdir(path) +class BaseTestCase(unittest.TestCase): + def _get_cass_and_cql_version(self): + """ + Probe system.local table to determine Cassandra and CQL version. + """ + c = Cluster() + s = c.connect() + s.set_keyspace('system') + row = s.execute('SELECT cql_version, release_version FROM local')[0] + + cass_version = self._get_version_as_tuple(row.release_version) + cql_version = self._get_version_as_tuple(row.cql_version) + + c.shutdown() + + result = {'cass_version': cass_version, 'cql_version': cql_version} + return result + + def _get_version_as_tuple(self, version): + version = version.split('.') + version = [int(p) for p in version] + version = tuple(version) + return version + + def get_cluster(): return CCM_CLUSTER diff --git a/tests/integration/test_types.py b/tests/integration/test_types.py index 8c435e6f52..5f14f010d2 100644 --- a/tests/integration/test_types.py +++ b/tests/integration/test_types.py @@ -12,18 +12,13 @@ from cassandra import InvalidRequest from cassandra.cluster import Cluster, NoHostAvailable +from tests.integration import BaseTestCase -class TypeTests(unittest.TestCase): + +class TypeTests(BaseTestCase): def setUp(self): super(TypeTests, self).setUp() - - # Detect CQL and Cassandra version - c = Cluster() - s = c.connect() - s.set_keyspace('system') - row = s.execute('SELECT cql_version, release_version FROM local')[0] - self._cql_version = self._get_version_as_tuple(row.cql_version) - self._cass_version = self._get_version_as_tuple(row.release_version) + self._versions = self._get_cass_and_cql_version() def test_blob_type_as_string(self): c = Cluster() @@ -49,7 +44,7 @@ def test_blob_type_as_string(self): query = 'INSERT INTO mytable (a, b) VALUES (%s, %s)' - if self._cql_version >= (3, 1, 0): + if self._versions['cql_version'] >= (3, 1, 0): # Blob values can't be specified using string notation in CQL 3.1.0 and # above which is used by default in Cassandra 2.0. msg = r'.*Invalid STRING constant \(.*?\) for b of type blob.*'