Skip to content
Browse files

Merge remote branch 'thobbs/add-0.8-calls' into n

  • Loading branch information...
2 parents 2b34f07 + d4945fa commit f93f4491e0e90a445efee77ca73c1a9a87640628 @thepaul thepaul committed
Showing with 128 additions and 8 deletions.
  1. +27 −1 example.py
  2. +21 −0 telephus/client.py
  3. +80 −7 test/test_cassandraclient.py
View
28 example.py
@@ -1,7 +1,8 @@
#!/usr/bin/python
from telephus.protocol import ManagedCassandraClientFactory
from telephus.client import CassandraClient
-from telephus.cassandra.ttypes import ColumnPath, ColumnParent, Column, SuperColumn
+from telephus.cassandra.ttypes import (ColumnPath, ColumnParent, Column,
+ SuperColumn, Compression)
from twisted.internet import defer
HOST = 'localhost'
@@ -9,6 +10,8 @@
KEYSPACE = 'Keyspace1'
CF = 'Standard1'
SCF = 'Super1'
+COUNT_CF = 'Counter1'
+SUPERCOUNT_CF = 'SuperCounter1'
colname = 'foo'
scname = 'bar'
@@ -16,30 +19,53 @@
def dostuff(client):
yield client.insert(key='test', column_family=CF, value='testval', column=colname)
yield client.insert(key='test', column_family=SCF, value='testval', column=colname, super_column=scname)
+
res = yield client.get(key='test', column_family=CF, column=colname)
print 'get', res
+
res = yield client.get(key='test', column_family=SCF, column=colname, super_column=scname)
print 'get (super)', res
+
res = yield client.get_slice(key='test', column_family=CF)
print 'get_slice', res
+
res = yield client.multiget(keys=['test', 'test2'], column_family=CF, column=colname)
print 'multiget', res
+
res = yield client.multiget_slice(keys=['test', 'test2'], column_family=CF)
print 'multiget_slice', res
+
res = yield client.get_count(key='test', column_family=CF)
print 'get_count', res
+
+ yield client.add(key='test', column_family=COUNT_CF, value=1, column='testcounter')
+ res = yield client.get(key='test', column_family=COUNT_CF, column='testcounter')
+ print 'get counter value', res
+
+ yield client.add(key='test', column_family=SUPERCOUNT_CF, value=1,
+ column='testcounter', super_column='testsuper')
+ res = yield client.get(key='test', column_family=SUPERCOUNT_CF,
+ column='testcounter', super_column='testsuper')
+ print 'get super counter value', res
+
+ res = yield client.execute_cql_query("SELECT * FROM %s WHERE KEY = %s" % (CF, 'test'.encode('hex')),
+ compression=Compression.NONE)
+ print 'execute_cql_query', res
+
# batch insert will figure out if you're trying a CF or SCF
# from the data structure
res = yield client.batch_insert(key='test', column_family=CF, mapping={colname: 'bar'})
print "batch_insert", res
res = yield client.batch_insert(key='test', column_family=SCF, mapping={'foo': {colname: 'bar'}})
print "batch_insert", res
+
# with ttypes, you pass a list as you would for raw thrift
# this way you can set custom timestamps
cols = [Column(colname, 'bar', 1234), Column('bar', 'baz', 54321)]
res = yield client.batch_insert(key='test', column_family=CF, mapping=cols)
print "batch_insert", res
cols = [SuperColumn(name=colname, columns=cols)]
+
# of course you don't have to use kwargs if the order is correct
res = yield client.batch_insert('test', SCF, cols)
print "batch_insert", res
View
21 telephus/client.py
@@ -154,6 +154,14 @@ def insert(self, key=None, column_family=None, value=None, column=None, super_co
req = ManagedThriftRequest('insert', key, cp, Column(column, value, timestamp, ttl), consistency)
return self.manager.pushRequest(req, retries=retries)
+ @requirekwargs('key', 'column_family', 'value', 'column')
+ def add(self, key=None, column_family=None, value=None, column=None, super_column=None,
+ consistency=None, retries=None):
+ cp = self._getparent(column_family, super_column)
+ consistency = consistency or self.consistency
+ req = ManagedThriftRequest('add', key, cp, CounterColumn(column, value), consistency)
+ return self.manager.pushRequest(req, retries=retries)
+
@requirekwargs('key', 'column_family')
def remove(self, key=None, column_family=None, column=None, super_column=None,
timestamp=None, consistency=None, retries=None):
@@ -163,6 +171,14 @@ def remove(self, key=None, column_family=None, column=None, super_column=None,
req = ManagedThriftRequest('remove', key, cp, timestamp, self.consistency)
return self.manager.pushRequest(req, retries=retries)
+ @requirekwargs('key', 'column_family', 'column')
+ def remove_counter(self, key=None, column_family=None, column=None, super_column=None,
+ consistency=None, retries=None):
+ cp = self._getpath(column_family, column, super_column)
+ consistency = consistency or self.consistency
+ req = ManagedThriftRequest('remove_counter', key, cp, self.consistency)
+ return self.manager.pushRequest(req, retries=retries)
+
@requirekwargs('key', 'column_family', 'mapping')
def batch_insert(self, key=None, column_family=None, mapping=None, timestamp=None,
consistency=None, retries=None, ttl=None):
@@ -210,6 +226,11 @@ def batch_mutate(self, mutationmap=None, timestamp=None, consistency=None, retri
req = ManagedThriftRequest('batch_mutate', mutmap, consistency)
return self.manager.pushRequest(req, retries=retries)
+ @requirekwargs('query')
+ def execute_cql_query(self, query=None, compression=Compression.GZIP, retries=None):
+ req = ManagedThriftRequest('execute_cql_query', query, compression)
+ return self.manager.pushRequest(req, retries=retries)
+
def _mk_cols_or_supers(self, mapping, timestamp, ttl=None, make_deletions=False):
if isinstance(mapping, list):
return mapping
View
87 test/test_cassandraclient.py
@@ -6,6 +6,7 @@
from telephus.cassandra import constants
from telephus.cassandra.ttypes import *
import os
+import zlib
CONNS = 5
@@ -15,6 +16,8 @@
T_KEYSPACE = 'TelephusTests2'
CF = 'Standard1'
SCF = 'Super1'
+COUNTER_CF = 'Counter1'
+SUPERCOUNTER_CF = 'SuperCounter1'
IDX_CF = 'IdxTestCF'
T_CF = 'TransientCF'
T_SCF = 'TransientSCF'
@@ -61,12 +64,25 @@ def setUp(self):
index_type=IndexType.KEYS,
index_name='idxCol1')
],
- default_validation_class='org.apache.cassandra.db.marshal.BytesType')
+ default_validation_class='org.apache.cassandra.db.marshal.BytesType'
+ ),
+ CfDef(
+ keyspace=KEYSPACE,
+ name=COUNTER_CF,
+ column_type='Standard',
+ default_validation_class='org.apache.cassandra.db.marshal.CounterColumnType'
+ ),
+ CfDef(
+ keyspace=KEYSPACE,
+ name=SUPERCOUNTER_CF,
+ column_type='Super',
+ default_validation_class='org.apache.cassandra.db.marshal.CounterColumnType'
+ ),
]
)
yield self.client.system_add_keyspace(self.my_keyspace)
yield self.client.set_keyspace(KEYSPACE)
-
+
@defer.inlineCallbacks
def tearDown(self):
yield self.client.system_drop_keyspace(self.my_keyspace.name)
@@ -74,9 +90,9 @@ def tearDown(self):
for c in reactor.getDelayedCalls():
c.cancel()
reactor.removeAll()
-
+
@defer.inlineCallbacks
- def test_insert_get(self):
+ def test_insert_get(self):
yield self.client.insert('test', CF, 'testval', column=COLUMN)
yield self.client.insert('test2', CF, 'testval2', column=COLUMN)
yield self.client.insert('test', SCF, 'superval', column=COLUMN, super_column=SCOLUMN)
@@ -97,7 +113,7 @@ def test_batch_insert_get_slice_and_count(self):
{COLUMN: 'test', COLUMN2: 'test2'})
yield self.client.batch_insert('test', SCF,
{SCOLUMN: {COLUMN: 'test', COLUMN2: 'test2'}})
- res = yield self.client.get_slice('test', CF, names=(COLUMN, COLUMN2))
+ res = yield self.client.get_slice('test', CF, names=(COLUMN, COLUMN2))
self.assertEqual(res[0].column.value, 'test')
self.assertEqual(res[1].column.value, 'test2')
res = yield self.client.get_slice('test', SCF, names=(COLUMN, COLUMN2),
@@ -106,7 +122,7 @@ def test_batch_insert_get_slice_and_count(self):
self.assertEqual(res[1].column.value, 'test2')
res = yield self.client.get_count('test', CF)
self.assertEqual(res, 2)
-
+
@defer.inlineCallbacks
def test_batch_mutate_and_remove(self):
yield self.client.batch_mutate({'test': {CF: {COLUMN: 'test', COLUMN2: 'test2'}, SCF: { SCOLUMN: { COLUMN: 'test', COLUMN2: 'test2'} } }, 'test2': {CF: {COLUMN: 'test', COLUMN2: 'test2'}, SCF: { SCOLUMN: { COLUMN: 'test', COLUMN2: 'test2'} } } })
@@ -155,7 +171,7 @@ def test_multiget_slice_remove(self):
res = yield self.client.multiget(['test', 'test2'], CF, column=COLUMN)
self.assertEqual(len(res['test']), 0)
self.assertEqual(len(res['test2']), 0)
-
+
@defer.inlineCallbacks
def test_range_slices(self):
yield self.client.insert('test', CF, 'testval', column=COLUMN)
@@ -175,6 +191,63 @@ def test_indexed_slices(self):
res = yield self.client.get_indexed_slices(IDX_CF, expressions, start_key='')
self.assertEquals(res[0].columns[0].column.value,'two')
+ @defer.inlineCallbacks
+ def test_counter_add(self):
+ # test standard column counter
+ yield self.client.add('test', COUNTER_CF, 1, column='col')
+ res = yield self.client.get('test', COUNTER_CF, column='col')
+ self.assertEquals(res.counter_column.value, 1)
+
+ yield self.client.add('test', COUNTER_CF, 1, column='col')
+ res = yield self.client.get('test', COUNTER_CF, column='col')
+ self.assertEquals(res.counter_column.value, 2)
+
+ # test super column counters
+ yield self.client.add('test', SUPERCOUNTER_CF, 1, column='col', super_column='scol')
+ res = yield self.client.get('test', SUPERCOUNTER_CF, column='col', super_column='scol')
+ self.assertEquals(res.counter_column.value, 1)
+
+ yield self.client.add('test', SUPERCOUNTER_CF, 1, column='col', super_column='scol')
+ res = yield self.client.get('test', SUPERCOUNTER_CF, column='col', super_column='scol')
+ self.assertEquals(res.counter_column.value, 2)
+
+ @defer.inlineCallbacks
+ def test_counter_remove(self):
+ # test standard column counter
+ yield self.client.add('test', COUNTER_CF, 1, column='col')
+ res = yield self.client.get('test', COUNTER_CF, column='col')
+ self.assertEquals(res.counter_column.value, 1)
+
+ yield self.client.remove_counter('test', COUNTER_CF, column='col')
+ yield self.assertFailure(self.client.get('test', COUNTER_CF, column='col'),
+ NotFoundException)
+
+ # test super column counters
+ yield self.client.add('test', SUPERCOUNTER_CF, 1, column='col', super_column='scol')
+ res = yield self.client.get('test', SUPERCOUNTER_CF, column='col', super_column='scol')
+ self.assertEquals(res.counter_column.value, 1)
+
+ yield self.client.remove_counter('test', SUPERCOUNTER_CF,
+ column='col', super_column='scol')
+ yield self.assertFailure(self.client.get('test', SUPERCOUNTER_CF,
+ column='col', super_column='scol'),
+ NotFoundException)
+
+ @defer.inlineCallbacks
+ def test_cql(self):
+ yield self.client.insert('test', CF, 'testval', column='col1')
+ res = yield self.client.get('test', CF, column='col1')
+ self.assertEquals(res.column.value, 'testval')
+
+ query = 'SELECT * from %s where KEY = %s' % (CF, 'test'.encode('hex'))
+ uncompressed_result = yield self.client.execute_cql_query(query, Compression.NONE)
+ self.assertEquals(uncompressed_result.rows[0].columns[0].name, 'col1')
+ self.assertEquals(uncompressed_result.rows[0].columns[0].value, 'testval')
+
+ compressed_query = zlib.compress(query)
+ compressed_result = yield self.client.execute_cql_query(compressed_query, Compression.GZIP)
+ self.assertEquals(uncompressed_result, compressed_result)
+
def sleep(self, secs):
d = defer.Deferred()
reactor.callLater(secs, d.callback, None)

0 comments on commit f93f449

Please sign in to comment.
Something went wrong with that request. Please try again.