Skip to content

Commit

Permalink
Merge remote branch 'thobbs/add-0.8-calls' into unstable
Browse files Browse the repository at this point in the history
  • Loading branch information
paul cannon committed Apr 27, 2011
2 parents cdbd60a + d4945fa commit a1613e7
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 8 deletions.
28 changes: 27 additions & 1 deletion example.py
@@ -1,45 +1,71 @@
#!/usr/bin/python
from telephus.protocol import ManagedCassandraClientFactory
from telephus.client import CassandraClient
from telephus.cassandra.ttypes import ColumnPath, ColumnParent, Column, SuperColumn
from telephus.cassandra.ttypes import (ColumnPath, ColumnParent, Column,
SuperColumn, Compression)
from twisted.internet import defer

HOST = 'localhost'
PORT = 9160
KEYSPACE = 'Keyspace1'
CF = 'Standard1'
SCF = 'Super1'
COUNT_CF = 'Counter1'
SUPERCOUNT_CF = 'SuperCounter1'
colname = 'foo'
scname = 'bar'

@defer.inlineCallbacks
def dostuff(client):
yield client.insert(key='test', column_family=CF, value='testval', column=colname)
yield client.insert(key='test', column_family=SCF, value='testval', column=colname, super_column=scname)

res = yield client.get(key='test', column_family=CF, column=colname)
print 'get', res

res = yield client.get(key='test', column_family=SCF, column=colname, super_column=scname)
print 'get (super)', res

res = yield client.get_slice(key='test', column_family=CF)
print 'get_slice', res

res = yield client.multiget(keys=['test', 'test2'], column_family=CF, column=colname)
print 'multiget', res

res = yield client.multiget_slice(keys=['test', 'test2'], column_family=CF)
print 'multiget_slice', res

res = yield client.get_count(key='test', column_family=CF)
print 'get_count', res

yield client.add(key='test', column_family=COUNT_CF, value=1, column='testcounter')
res = yield client.get(key='test', column_family=COUNT_CF, column='testcounter')
print 'get counter value', res

yield client.add(key='test', column_family=SUPERCOUNT_CF, value=1,
column='testcounter', super_column='testsuper')
res = yield client.get(key='test', column_family=SUPERCOUNT_CF,
column='testcounter', super_column='testsuper')
print 'get super counter value', res

res = yield client.execute_cql_query("SELECT * FROM %s WHERE KEY = %s" % (CF, 'test'.encode('hex')),
compression=Compression.NONE)
print 'execute_cql_query', res

# batch insert will figure out if you're trying a CF or SCF
# from the data structure
res = yield client.batch_insert(key='test', column_family=CF, mapping={colname: 'bar'})
print "batch_insert", res
res = yield client.batch_insert(key='test', column_family=SCF, mapping={'foo': {colname: 'bar'}})
print "batch_insert", res

# with ttypes, you pass a list as you would for raw thrift
# this way you can set custom timestamps
cols = [Column(colname, 'bar', 1234), Column('bar', 'baz', 54321)]
res = yield client.batch_insert(key='test', column_family=CF, mapping=cols)
print "batch_insert", res
cols = [SuperColumn(name=colname, columns=cols)]

# of course you don't have to use kwargs if the order is correct
res = yield client.batch_insert('test', SCF, cols)
print "batch_insert", res
Expand Down
21 changes: 21 additions & 0 deletions telephus/client.py
Expand Up @@ -154,6 +154,14 @@ def insert(self, key=None, column_family=None, value=None, column=None, super_co
req = ManagedThriftRequest('insert', key, cp, Column(column, value, timestamp, ttl), consistency)
return self.manager.pushRequest(req, retries=retries)

@requirekwargs('key', 'column_family', 'value', 'column')
def add(self, key=None, column_family=None, value=None, column=None, super_column=None,
consistency=None, retries=None):
cp = self._getparent(column_family, super_column)
consistency = consistency or self.consistency
req = ManagedThriftRequest('add', key, cp, CounterColumn(column, value), consistency)
return self.manager.pushRequest(req, retries=retries)

@requirekwargs('key', 'column_family')
def remove(self, key=None, column_family=None, column=None, super_column=None,
timestamp=None, consistency=None, retries=None):
Expand All @@ -163,6 +171,14 @@ def remove(self, key=None, column_family=None, column=None, super_column=None,
req = ManagedThriftRequest('remove', key, cp, timestamp, self.consistency)
return self.manager.pushRequest(req, retries=retries)

@requirekwargs('key', 'column_family', 'column')
def remove_counter(self, key=None, column_family=None, column=None, super_column=None,
consistency=None, retries=None):
cp = self._getpath(column_family, column, super_column)
consistency = consistency or self.consistency
req = ManagedThriftRequest('remove_counter', key, cp, self.consistency)
return self.manager.pushRequest(req, retries=retries)

@requirekwargs('key', 'column_family', 'mapping')
def batch_insert(self, key=None, column_family=None, mapping=None, timestamp=None,
consistency=None, retries=None, ttl=None):
Expand Down Expand Up @@ -210,6 +226,11 @@ def batch_mutate(self, mutationmap=None, timestamp=None, consistency=None, retri
req = ManagedThriftRequest('batch_mutate', mutmap, consistency)
return self.manager.pushRequest(req, retries=retries)

@requirekwargs('query')
def execute_cql_query(self, query=None, compression=Compression.GZIP, retries=None):
req = ManagedThriftRequest('execute_cql_query', query, compression)
return self.manager.pushRequest(req, retries=retries)

def _mk_cols_or_supers(self, mapping, timestamp, ttl=None, make_deletions=False):
if isinstance(mapping, list):
return mapping
Expand Down
87 changes: 80 additions & 7 deletions test/test_cassandraclient.py
Expand Up @@ -6,6 +6,7 @@
from telephus import translate
from telephus.cassandra.ttypes import *
import os
import zlib

CONNS = 5

Expand All @@ -15,6 +16,8 @@
T_KEYSPACE = 'TelephusTests2'
CF = 'Standard1'
SCF = 'Super1'
COUNTER_CF = 'Counter1'
SUPERCOUNTER_CF = 'SuperCounter1'
IDX_CF = 'IdxTestCF'
T_CF = 'TransientCF'
T_SCF = 'TransientSCF'
Expand Down Expand Up @@ -61,22 +64,35 @@ def setUp(self):
index_type=IndexType.KEYS,
index_name='idxCol1')
],
default_validation_class='org.apache.cassandra.db.marshal.BytesType')
default_validation_class='org.apache.cassandra.db.marshal.BytesType'
),
CfDef(
keyspace=KEYSPACE,
name=COUNTER_CF,
column_type='Standard',
default_validation_class='org.apache.cassandra.db.marshal.CounterColumnType'
),
CfDef(
keyspace=KEYSPACE,
name=SUPERCOUNTER_CF,
column_type='Super',
default_validation_class='org.apache.cassandra.db.marshal.CounterColumnType'
),
]
)
yield self.client.system_add_keyspace(self.my_keyspace)
yield self.client.set_keyspace(KEYSPACE)

@defer.inlineCallbacks
def tearDown(self):
yield self.client.system_drop_keyspace(self.my_keyspace.name)
self.cmanager.shutdown()
for c in reactor.getDelayedCalls():
c.cancel()
reactor.removeAll()

@defer.inlineCallbacks
def test_insert_get(self):
def test_insert_get(self):
yield self.client.insert('test', CF, 'testval', column=COLUMN)
yield self.client.insert('test2', CF, 'testval2', column=COLUMN)
yield self.client.insert('test', SCF, 'superval', column=COLUMN, super_column=SCOLUMN)
Expand All @@ -97,7 +113,7 @@ def test_batch_insert_get_slice_and_count(self):
{COLUMN: 'test', COLUMN2: 'test2'})
yield self.client.batch_insert('test', SCF,
{SCOLUMN: {COLUMN: 'test', COLUMN2: 'test2'}})
res = yield self.client.get_slice('test', CF, names=(COLUMN, COLUMN2))
res = yield self.client.get_slice('test', CF, names=(COLUMN, COLUMN2))
self.assertEqual(res[0].column.value, 'test')
self.assertEqual(res[1].column.value, 'test2')
res = yield self.client.get_slice('test', SCF, names=(COLUMN, COLUMN2),
Expand All @@ -106,7 +122,7 @@ def test_batch_insert_get_slice_and_count(self):
self.assertEqual(res[1].column.value, 'test2')
res = yield self.client.get_count('test', CF)
self.assertEqual(res, 2)

@defer.inlineCallbacks
def test_batch_mutate_and_remove(self):
yield self.client.batch_mutate({'test': {CF: {COLUMN: 'test', COLUMN2: 'test2'}, SCF: { SCOLUMN: { COLUMN: 'test', COLUMN2: 'test2'} } }, 'test2': {CF: {COLUMN: 'test', COLUMN2: 'test2'}, SCF: { SCOLUMN: { COLUMN: 'test', COLUMN2: 'test2'} } } })
Expand Down Expand Up @@ -155,7 +171,7 @@ def test_multiget_slice_remove(self):
res = yield self.client.multiget(['test', 'test2'], CF, column=COLUMN)
self.assertEqual(len(res['test']), 0)
self.assertEqual(len(res['test2']), 0)

@defer.inlineCallbacks
def test_range_slices(self):
yield self.client.insert('test', CF, 'testval', column=COLUMN)
Expand All @@ -175,6 +191,63 @@ def test_indexed_slices(self):
res = yield self.client.get_indexed_slices(IDX_CF, expressions, start_key='')
self.assertEquals(res[0].columns[0].column.value,'two')

@defer.inlineCallbacks
def test_counter_add(self):
# test standard column counter
yield self.client.add('test', COUNTER_CF, 1, column='col')
res = yield self.client.get('test', COUNTER_CF, column='col')
self.assertEquals(res.counter_column.value, 1)

yield self.client.add('test', COUNTER_CF, 1, column='col')
res = yield self.client.get('test', COUNTER_CF, column='col')
self.assertEquals(res.counter_column.value, 2)

# test super column counters
yield self.client.add('test', SUPERCOUNTER_CF, 1, column='col', super_column='scol')
res = yield self.client.get('test', SUPERCOUNTER_CF, column='col', super_column='scol')
self.assertEquals(res.counter_column.value, 1)

yield self.client.add('test', SUPERCOUNTER_CF, 1, column='col', super_column='scol')
res = yield self.client.get('test', SUPERCOUNTER_CF, column='col', super_column='scol')
self.assertEquals(res.counter_column.value, 2)

@defer.inlineCallbacks
def test_counter_remove(self):
# test standard column counter
yield self.client.add('test', COUNTER_CF, 1, column='col')
res = yield self.client.get('test', COUNTER_CF, column='col')
self.assertEquals(res.counter_column.value, 1)

yield self.client.remove_counter('test', COUNTER_CF, column='col')
yield self.assertFailure(self.client.get('test', COUNTER_CF, column='col'),
NotFoundException)

# test super column counters
yield self.client.add('test', SUPERCOUNTER_CF, 1, column='col', super_column='scol')
res = yield self.client.get('test', SUPERCOUNTER_CF, column='col', super_column='scol')
self.assertEquals(res.counter_column.value, 1)

yield self.client.remove_counter('test', SUPERCOUNTER_CF,
column='col', super_column='scol')
yield self.assertFailure(self.client.get('test', SUPERCOUNTER_CF,
column='col', super_column='scol'),
NotFoundException)

@defer.inlineCallbacks
def test_cql(self):
yield self.client.insert('test', CF, 'testval', column='col1')
res = yield self.client.get('test', CF, column='col1')
self.assertEquals(res.column.value, 'testval')

query = 'SELECT * from %s where KEY = %s' % (CF, 'test'.encode('hex'))
uncompressed_result = yield self.client.execute_cql_query(query, Compression.NONE)
self.assertEquals(uncompressed_result.rows[0].columns[0].name, 'col1')
self.assertEquals(uncompressed_result.rows[0].columns[0].value, 'testval')

compressed_query = zlib.compress(query)
compressed_result = yield self.client.execute_cql_query(compressed_query, Compression.GZIP)
self.assertEquals(uncompressed_result, compressed_result)

def sleep(self, secs):
d = defer.Deferred()
reactor.callLater(secs, d.callback, None)
Expand Down

0 comments on commit a1613e7

Please sign in to comment.