Browse files

Release 0.7~beta1, merge fixes from Paul Cannon

  • Loading branch information...
1 parent f2402a3 commit 6f5dce7b41931b6220158070fe210cdcc68f380a @driftx committed Aug 15, 2010
View
8 debian/changelog
@@ -1,3 +1,11 @@
+python-telephus (0.7~beta1) unstable; urgency=low
+
+ * Add describe_version
+ * Update to work with Cassandra 0.7.x HEAD r985573
+ * Merge updates from Paul Cannon
+
+ -- Brandon Williams <brandon@riptano.com> Sun, 15 Aug 2010 08:40:00 +0000
+
python-telephus (0.6.2) unstable; urgency=low
* Fix double errbacks
View
2 telephus/cassandra/Cassandra-remote
@@ -40,7 +40,7 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help':
print ' string describe_version()'
print ' describe_ring(string keyspace)'
print ' string describe_partitioner()'
- print ' describe_keyspace(string keyspace)'
+ print ' KsDef describe_keyspace(string keyspace)'
print ' describe_splits(string keyspace, string cfName, string start_token, string end_token, i32 keys_per_split)'
print ' string system_add_column_family(CfDef cf_def)'
print ' string system_drop_column_family(string column_family)'
View
1,430 telephus/cassandra/Cassandra.py
@@ -14,23 +14,26 @@
except:
fastbinary = None
+from zope.interface import Interface, implements
+from twisted.internet import defer
+from thrift.transport import TTwisted
-class Iface:
- def login(self, auth_request):
+class Iface(Interface):
+ def login(auth_request):
"""
Parameters:
- auth_request
"""
pass
- def set_keyspace(self, keyspace):
+ def set_keyspace(keyspace):
"""
Parameters:
- keyspace
"""
pass
- def get(self, key, column_path, consistency_level):
+ def get(key, column_path, consistency_level):
"""
Get the Column or SuperColumn at the given column_path. If no value is present, NotFoundException is thrown. (This is
the only method that can throw an exception under non-failure conditions.)
@@ -42,7 +45,7 @@ def get(self, key, column_path, consistency_level):
"""
pass
- def get_slice(self, key, column_parent, predicate, consistency_level):
+ def get_slice(key, column_parent, predicate, consistency_level):
"""
Get the group of columns contained by column_parent (either a ColumnFamily name or a ColumnFamily/SuperColumn name
pair) specified by the given SlicePredicate. If no matching values are found, an empty list is returned.
@@ -55,7 +58,7 @@ def get_slice(self, key, column_parent, predicate, consistency_level):
"""
pass
- def get_count(self, key, column_parent, predicate, consistency_level):
+ def get_count(key, column_parent, predicate, consistency_level):
"""
returns the number of columns matching <code>predicate</code> for a particular <code>key</code>,
<code>ColumnFamily</code> and optionally <code>SuperColumn</code>.
@@ -68,7 +71,7 @@ def get_count(self, key, column_parent, predicate, consistency_level):
"""
pass
- def multiget_slice(self, keys, column_parent, predicate, consistency_level):
+ def multiget_slice(keys, column_parent, predicate, consistency_level):
"""
Performs a get_slice for column_parent and predicate for the given keys in parallel.
@@ -80,7 +83,7 @@ def multiget_slice(self, keys, column_parent, predicate, consistency_level):
"""
pass
- def multiget_count(self, keyspace, keys, column_parent, predicate, consistency_level):
+ def multiget_count(keyspace, keys, column_parent, predicate, consistency_level):
"""
Perform a get_count in parallel on the given list<binary> keys. The return value maps keys to the count found.
@@ -93,7 +96,7 @@ def multiget_count(self, keyspace, keys, column_parent, predicate, consistency_l
"""
pass
- def get_range_slices(self, column_parent, predicate, range, consistency_level):
+ def get_range_slices(column_parent, predicate, range, consistency_level):
"""
returns a subset of columns for a contiguous range of keys.
@@ -105,7 +108,7 @@ def get_range_slices(self, column_parent, predicate, range, consistency_level):
"""
pass
- def get_indexed_slices(self, column_parent, index_clause, column_predicate, consistency_level):
+ def get_indexed_slices(column_parent, index_clause, column_predicate, consistency_level):
"""
Returns the subset of columns specified in SlicePredicate for the rows matching the IndexClause
@@ -117,7 +120,7 @@ def get_indexed_slices(self, column_parent, index_clause, column_predicate, cons
"""
pass
- def insert(self, key, column_parent, column, consistency_level):
+ def insert(key, column_parent, column, consistency_level):
"""
Insert a Column at the given column_parent.column_family and optional column_parent.super_column.
@@ -129,7 +132,7 @@ def insert(self, key, column_parent, column, consistency_level):
"""
pass
- def remove(self, key, column_path, clock, consistency_level):
+ def remove(key, column_path, clock, consistency_level):
"""
Remove data from the row specified by key at the granularity specified by column_path, and the given clock. Note
that all the values in column_path besides column_path.column_family are truly optional: you can remove the entire
@@ -143,7 +146,7 @@ def remove(self, key, column_path, clock, consistency_level):
"""
pass
- def batch_mutate(self, mutation_map, consistency_level):
+ def batch_mutate(mutation_map, consistency_level):
"""
Mutate many columns or super columns for many row keys. See also: Mutation.
@@ -156,7 +159,7 @@ def batch_mutate(self, mutation_map, consistency_level):
"""
pass
- def truncate(self, cfname):
+ def truncate(cfname):
"""
Truncate will mark and entire column family as deleted.
From the user's perspective a successful call to truncate will result complete data deletion from cfname.
@@ -170,33 +173,33 @@ def truncate(self, cfname):
"""
pass
- def check_schema_agreement(self, ):
+ def check_schema_agreement():
"""
ask the cluster if they all are using the same migration id. returns a map of version->hosts-on-that-version.
hosts that did not respond will be under the key DatabaseDescriptor.INITIAL_VERSION. agreement can be determined
by checking if the size of the map is 1.
"""
pass
- def describe_keyspaces(self, ):
+ def describe_keyspaces():
"""
list the defined keyspaces in this cluster
"""
pass
- def describe_cluster_name(self, ):
+ def describe_cluster_name():
"""
get the cluster name
"""
pass
- def describe_version(self, ):
+ def describe_version():
"""
get the thrift api version
"""
pass
- def describe_ring(self, keyspace):
+ def describe_ring(keyspace):
"""
get the token ring: a map of ranges to host addresses,
represented as a set of TokenRange instead of a map from range
@@ -212,13 +215,13 @@ def describe_ring(self, keyspace):
"""
pass
- def describe_partitioner(self, ):
+ def describe_partitioner():
"""
returns the partitioner used by this cluster
"""
pass
- def describe_keyspace(self, keyspace):
+ def describe_keyspace(keyspace):
"""
describe specified keyspace
@@ -227,7 +230,7 @@ def describe_keyspace(self, keyspace):
"""
pass
- def describe_splits(self, keyspace, cfName, start_token, end_token, keys_per_split):
+ def describe_splits(keyspace, cfName, start_token, end_token, keys_per_split):
"""
experimental API for hadoop/parallel query support.
may change violently and without warning.
@@ -244,7 +247,7 @@ def describe_splits(self, keyspace, cfName, start_token, end_token, keys_per_spl
"""
pass
- def system_add_column_family(self, cf_def):
+ def system_add_column_family(cf_def):
"""
adds a column family. returns the new schema id.
@@ -253,7 +256,7 @@ def system_add_column_family(self, cf_def):
"""
pass
- def system_drop_column_family(self, column_family):
+ def system_drop_column_family(column_family):
"""
drops a column family. returns the new schema id.
@@ -262,7 +265,7 @@ def system_drop_column_family(self, column_family):
"""
pass
- def system_rename_column_family(self, old_name, new_name):
+ def system_rename_column_family(old_name, new_name):
"""
renames a column family. returns the new schema id.
@@ -272,7 +275,7 @@ def system_rename_column_family(self, old_name, new_name):
"""
pass
- def system_add_keyspace(self, ks_def):
+ def system_add_keyspace(ks_def):
"""
adds a keyspace and any column families that are part of it. returns the new schema id.
@@ -281,7 +284,7 @@ def system_add_keyspace(self, ks_def):
"""
pass
- def system_drop_keyspace(self, keyspace):
+ def system_drop_keyspace(keyspace):
"""
drops a keyspace and any column families that are part of it. returns the new schema id.
@@ -290,7 +293,7 @@ def system_drop_keyspace(self, keyspace):
"""
pass
- def system_rename_keyspace(self, old_name, new_name):
+ def system_rename_keyspace(old_name, new_name):
"""
renames a keyspace. returns the new schema id.
@@ -301,76 +304,84 @@ def system_rename_keyspace(self, old_name, new_name):
pass
-class Client(Iface):
- def __init__(self, iprot, oprot=None):
- self._iprot = self._oprot = iprot
- if oprot != None:
- self._oprot = oprot
+class Client:
+ implements(Iface)
+
+ def __init__(self, transport, oprot_factory):
+ self._transport = transport
+ self._oprot_factory = oprot_factory
self._seqid = 0
+ self._reqs = {}
def login(self, auth_request):
"""
Parameters:
- auth_request
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_login(auth_request)
- return self.recv_login()
+ return d
def send_login(self, auth_request):
- self._oprot.writeMessageBegin('login', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('login', TMessageType.CALL, self._seqid)
args = login_args()
args.auth_request = auth_request
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_login(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_login(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = login_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.success != None:
- return result.success
+ return d.callback(result.success)
if result.authnx != None:
- raise result.authnx
+ return d.errback(result.authnx)
if result.authzx != None:
- raise result.authzx
- raise TApplicationException(TApplicationException.MISSING_RESULT, "login failed: unknown result");
+ return d.errback(result.authzx)
+ return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, "login failed: unknown result"))
def set_keyspace(self, keyspace):
"""
Parameters:
- keyspace
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_set_keyspace(keyspace)
- self.recv_set_keyspace()
+ return d
def send_set_keyspace(self, keyspace):
- self._oprot.writeMessageBegin('set_keyspace', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('set_keyspace', TMessageType.CALL, self._seqid)
args = set_keyspace_args()
args.keyspace = keyspace
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_set_keyspace(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_set_keyspace(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = set_keyspace_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.ire != None:
- raise result.ire
- return
+ return d.errback(result.ire)
+ return d.callback(None)
def get(self, key, column_path, consistency_level):
"""
@@ -382,40 +393,43 @@ def get(self, key, column_path, consistency_level):
- column_path
- consistency_level
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_get(key, column_path, consistency_level)
- return self.recv_get()
+ return d
def send_get(self, key, column_path, consistency_level):
- self._oprot.writeMessageBegin('get', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('get', TMessageType.CALL, self._seqid)
args = get_args()
args.key = key
args.column_path = column_path
args.consistency_level = consistency_level
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_get(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_get(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = get_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.success != None:
- return result.success
+ return d.callback(result.success)
if result.ire != None:
- raise result.ire
+ return d.errback(result.ire)
if result.nfe != None:
- raise result.nfe
+ return d.errback(result.nfe)
if result.ue != None:
- raise result.ue
+ return d.errback(result.ue)
if result.te != None:
- raise result.te
- raise TApplicationException(TApplicationException.MISSING_RESULT, "get failed: unknown result");
+ return d.errback(result.te)
+ return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, "get failed: unknown result"))
def get_slice(self, key, column_parent, predicate, consistency_level):
"""
@@ -428,39 +442,42 @@ def get_slice(self, key, column_parent, predicate, consistency_level):
- predicate
- consistency_level
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_get_slice(key, column_parent, predicate, consistency_level)
- return self.recv_get_slice()
+ return d
def send_get_slice(self, key, column_parent, predicate, consistency_level):
- self._oprot.writeMessageBegin('get_slice', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('get_slice', TMessageType.CALL, self._seqid)
args = get_slice_args()
args.key = key
args.column_parent = column_parent
args.predicate = predicate
args.consistency_level = consistency_level
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_get_slice(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_get_slice(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = get_slice_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.success != None:
- return result.success
+ return d.callback(result.success)
if result.ire != None:
- raise result.ire
+ return d.errback(result.ire)
if result.ue != None:
- raise result.ue
+ return d.errback(result.ue)
if result.te != None:
- raise result.te
- raise TApplicationException(TApplicationException.MISSING_RESULT, "get_slice failed: unknown result");
+ return d.errback(result.te)
+ return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, "get_slice failed: unknown result"))
def get_count(self, key, column_parent, predicate, consistency_level):
"""
@@ -473,39 +490,42 @@ def get_count(self, key, column_parent, predicate, consistency_level):
- predicate
- consistency_level
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_get_count(key, column_parent, predicate, consistency_level)
- return self.recv_get_count()
+ return d
def send_get_count(self, key, column_parent, predicate, consistency_level):
- self._oprot.writeMessageBegin('get_count', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('get_count', TMessageType.CALL, self._seqid)
args = get_count_args()
args.key = key
args.column_parent = column_parent
args.predicate = predicate
args.consistency_level = consistency_level
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_get_count(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_get_count(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = get_count_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.success != None:
- return result.success
+ return d.callback(result.success)
if result.ire != None:
- raise result.ire
+ return d.errback(result.ire)
if result.ue != None:
- raise result.ue
+ return d.errback(result.ue)
if result.te != None:
- raise result.te
- raise TApplicationException(TApplicationException.MISSING_RESULT, "get_count failed: unknown result");
+ return d.errback(result.te)
+ return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, "get_count failed: unknown result"))
def multiget_slice(self, keys, column_parent, predicate, consistency_level):
"""
@@ -517,39 +537,42 @@ def multiget_slice(self, keys, column_parent, predicate, consistency_level):
- predicate
- consistency_level
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_multiget_slice(keys, column_parent, predicate, consistency_level)
- return self.recv_multiget_slice()
+ return d
def send_multiget_slice(self, keys, column_parent, predicate, consistency_level):
- self._oprot.writeMessageBegin('multiget_slice', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('multiget_slice', TMessageType.CALL, self._seqid)
args = multiget_slice_args()
args.keys = keys
args.column_parent = column_parent
args.predicate = predicate
args.consistency_level = consistency_level
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_multiget_slice(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_multiget_slice(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = multiget_slice_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.success != None:
- return result.success
+ return d.callback(result.success)
if result.ire != None:
- raise result.ire
+ return d.errback(result.ire)
if result.ue != None:
- raise result.ue
+ return d.errback(result.ue)
if result.te != None:
- raise result.te
- raise TApplicationException(TApplicationException.MISSING_RESULT, "multiget_slice failed: unknown result");
+ return d.errback(result.te)
+ return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, "multiget_slice failed: unknown result"))
def multiget_count(self, keyspace, keys, column_parent, predicate, consistency_level):
"""
@@ -562,40 +585,43 @@ def multiget_count(self, keyspace, keys, column_parent, predicate, consistency_l
- predicate
- consistency_level
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_multiget_count(keyspace, keys, column_parent, predicate, consistency_level)
- return self.recv_multiget_count()
+ return d
def send_multiget_count(self, keyspace, keys, column_parent, predicate, consistency_level):
- self._oprot.writeMessageBegin('multiget_count', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('multiget_count', TMessageType.CALL, self._seqid)
args = multiget_count_args()
args.keyspace = keyspace
args.keys = keys
args.column_parent = column_parent
args.predicate = predicate
args.consistency_level = consistency_level
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_multiget_count(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_multiget_count(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = multiget_count_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.success != None:
- return result.success
+ return d.callback(result.success)
if result.ire != None:
- raise result.ire
+ return d.errback(result.ire)
if result.ue != None:
- raise result.ue
+ return d.errback(result.ue)
if result.te != None:
- raise result.te
- raise TApplicationException(TApplicationException.MISSING_RESULT, "multiget_count failed: unknown result");
+ return d.errback(result.te)
+ return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, "multiget_count failed: unknown result"))
def get_range_slices(self, column_parent, predicate, range, consistency_level):
"""
@@ -607,39 +633,42 @@ def get_range_slices(self, column_parent, predicate, range, consistency_level):
- range
- consistency_level
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_get_range_slices(column_parent, predicate, range, consistency_level)
- return self.recv_get_range_slices()
+ return d
def send_get_range_slices(self, column_parent, predicate, range, consistency_level):
- self._oprot.writeMessageBegin('get_range_slices', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('get_range_slices', TMessageType.CALL, self._seqid)
args = get_range_slices_args()
args.column_parent = column_parent
args.predicate = predicate
args.range = range
args.consistency_level = consistency_level
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_get_range_slices(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_get_range_slices(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = get_range_slices_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.success != None:
- return result.success
+ return d.callback(result.success)
if result.ire != None:
- raise result.ire
+ return d.errback(result.ire)
if result.ue != None:
- raise result.ue
+ return d.errback(result.ue)
if result.te != None:
- raise result.te
- raise TApplicationException(TApplicationException.MISSING_RESULT, "get_range_slices failed: unknown result");
+ return d.errback(result.te)
+ return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, "get_range_slices failed: unknown result"))
def get_indexed_slices(self, column_parent, index_clause, column_predicate, consistency_level):
"""
@@ -651,39 +680,42 @@ def get_indexed_slices(self, column_parent, index_clause, column_predicate, cons
- column_predicate
- consistency_level
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_get_indexed_slices(column_parent, index_clause, column_predicate, consistency_level)
- return self.recv_get_indexed_slices()
+ return d
def send_get_indexed_slices(self, column_parent, index_clause, column_predicate, consistency_level):
- self._oprot.writeMessageBegin('get_indexed_slices', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('get_indexed_slices', TMessageType.CALL, self._seqid)
args = get_indexed_slices_args()
args.column_parent = column_parent
args.index_clause = index_clause
args.column_predicate = column_predicate
args.consistency_level = consistency_level
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_get_indexed_slices(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_get_indexed_slices(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = get_indexed_slices_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.success != None:
- return result.success
+ return d.callback(result.success)
if result.ire != None:
- raise result.ire
+ return d.errback(result.ire)
if result.ue != None:
- raise result.ue
+ return d.errback(result.ue)
if result.te != None:
- raise result.te
- raise TApplicationException(TApplicationException.MISSING_RESULT, "get_indexed_slices failed: unknown result");
+ return d.errback(result.te)
+ return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, "get_indexed_slices failed: unknown result"))
def insert(self, key, column_parent, column, consistency_level):
"""
@@ -695,37 +727,40 @@ def insert(self, key, column_parent, column, consistency_level):
- column
- consistency_level
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_insert(key, column_parent, column, consistency_level)
- self.recv_insert()
+ return d
def send_insert(self, key, column_parent, column, consistency_level):
- self._oprot.writeMessageBegin('insert', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('insert', TMessageType.CALL, self._seqid)
args = insert_args()
args.key = key
args.column_parent = column_parent
args.column = column
args.consistency_level = consistency_level
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_insert(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_insert(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = insert_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.ire != None:
- raise result.ire
+ return d.errback(result.ire)
if result.ue != None:
- raise result.ue
+ return d.errback(result.ue)
if result.te != None:
- raise result.te
- return
+ return d.errback(result.te)
+ return d.callback(None)
def remove(self, key, column_path, clock, consistency_level):
"""
@@ -739,37 +774,40 @@ def remove(self, key, column_path, clock, consistency_level):
- clock
- consistency_level
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_remove(key, column_path, clock, consistency_level)
- self.recv_remove()
+ return d
def send_remove(self, key, column_path, clock, consistency_level):
- self._oprot.writeMessageBegin('remove', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('remove', TMessageType.CALL, self._seqid)
args = remove_args()
args.key = key
args.column_path = column_path
args.clock = clock
args.consistency_level = consistency_level
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_remove(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_remove(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = remove_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.ire != None:
- raise result.ire
+ return d.errback(result.ire)
if result.ue != None:
- raise result.ue
+ return d.errback(result.ue)
if result.te != None:
- raise result.te
- return
+ return d.errback(result.te)
+ return d.callback(None)
def batch_mutate(self, mutation_map, consistency_level):
"""
@@ -782,35 +820,38 @@ def batch_mutate(self, mutation_map, consistency_level):
- mutation_map
- consistency_level
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_batch_mutate(mutation_map, consistency_level)
- self.recv_batch_mutate()
+ return d
def send_batch_mutate(self, mutation_map, consistency_level):
- self._oprot.writeMessageBegin('batch_mutate', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('batch_mutate', TMessageType.CALL, self._seqid)
args = batch_mutate_args()
args.mutation_map = mutation_map
args.consistency_level = consistency_level
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_batch_mutate(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_batch_mutate(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = batch_mutate_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.ire != None:
- raise result.ire
+ return d.errback(result.ire)
if result.ue != None:
- raise result.ue
+ return d.errback(result.ue)
if result.te != None:
- raise result.te
- return
+ return d.errback(result.te)
+ return d.callback(None)
def truncate(self, cfname):
"""
@@ -824,148 +865,163 @@ def truncate(self, cfname):
Parameters:
- cfname
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_truncate(cfname)
- self.recv_truncate()
+ return d
def send_truncate(self, cfname):
- self._oprot.writeMessageBegin('truncate', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('truncate', TMessageType.CALL, self._seqid)
args = truncate_args()
args.cfname = cfname
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_truncate(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_truncate(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = truncate_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.ire != None:
- raise result.ire
+ return d.errback(result.ire)
if result.ue != None:
- raise result.ue
- return
+ return d.errback(result.ue)
+ return d.callback(None)
def check_schema_agreement(self, ):
"""
ask the cluster if they all are using the same migration id. returns a map of version->hosts-on-that-version.
hosts that did not respond will be under the key DatabaseDescriptor.INITIAL_VERSION. agreement can be determined
by checking if the size of the map is 1.
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_check_schema_agreement()
- return self.recv_check_schema_agreement()
+ return d
def send_check_schema_agreement(self, ):
- self._oprot.writeMessageBegin('check_schema_agreement', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('check_schema_agreement', TMessageType.CALL, self._seqid)
args = check_schema_agreement_args()
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_check_schema_agreement(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_check_schema_agreement(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = check_schema_agreement_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.success != None:
- return result.success
+ return d.callback(result.success)
if result.ire != None:
- raise result.ire
- raise TApplicationException(TApplicationException.MISSING_RESULT, "check_schema_agreement failed: unknown result");
+ return d.errback(result.ire)
+ return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, "check_schema_agreement failed: unknown result"))
def describe_keyspaces(self, ):
"""
list the defined keyspaces in this cluster
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_describe_keyspaces()
- return self.recv_describe_keyspaces()
+ return d
def send_describe_keyspaces(self, ):
- self._oprot.writeMessageBegin('describe_keyspaces', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('describe_keyspaces', TMessageType.CALL, self._seqid)
args = describe_keyspaces_args()
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_describe_keyspaces(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_describe_keyspaces(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = describe_keyspaces_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.success != None:
- return result.success
- raise TApplicationException(TApplicationException.MISSING_RESULT, "describe_keyspaces failed: unknown result");
+ return d.callback(result.success)
+ return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, "describe_keyspaces failed: unknown result"))
def describe_cluster_name(self, ):
"""
get the cluster name
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_describe_cluster_name()
- return self.recv_describe_cluster_name()
+ return d
def send_describe_cluster_name(self, ):
- self._oprot.writeMessageBegin('describe_cluster_name', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('describe_cluster_name', TMessageType.CALL, self._seqid)
args = describe_cluster_name_args()
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_describe_cluster_name(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_describe_cluster_name(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = describe_cluster_name_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.success != None:
- return result.success
- raise TApplicationException(TApplicationException.MISSING_RESULT, "describe_cluster_name failed: unknown result");
+ return d.callback(result.success)
+ return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, "describe_cluster_name failed: unknown result"))
def describe_version(self, ):
"""
get the thrift api version
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_describe_version()
- return self.recv_describe_version()
+ return d
def send_describe_version(self, ):
- self._oprot.writeMessageBegin('describe_version', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('describe_version', TMessageType.CALL, self._seqid)
args = describe_version_args()
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_describe_version(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_describe_version(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = describe_version_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.success != None:
- return result.success
- raise TApplicationException(TApplicationException.MISSING_RESULT, "describe_version failed: unknown result");
+ return d.callback(result.success)
+ return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, "describe_version failed: unknown result"))
def describe_ring(self, keyspace):
"""
@@ -981,60 +1037,66 @@ def describe_ring(self, keyspace):
Parameters:
- keyspace
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_describe_ring(keyspace)
- return self.recv_describe_ring()
+ return d
def send_describe_ring(self, keyspace):
- self._oprot.writeMessageBegin('describe_ring', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('describe_ring', TMessageType.CALL, self._seqid)
args = describe_ring_args()
args.keyspace = keyspace
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_describe_ring(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_describe_ring(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = describe_ring_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.success != None:
- return result.success
+ return d.callback(result.success)
if result.ire != None:
- raise result.ire
- raise TApplicationException(TApplicationException.MISSING_RESULT, "describe_ring failed: unknown result");
+ return d.errback(result.ire)
+ return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, "describe_ring failed: unknown result"))
def describe_partitioner(self, ):
"""
returns the partitioner used by this cluster
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_describe_partitioner()
- return self.recv_describe_partitioner()
+ return d
def send_describe_partitioner(self, ):
- self._oprot.writeMessageBegin('describe_partitioner', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('describe_partitioner', TMessageType.CALL, self._seqid)
args = describe_partitioner_args()
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_describe_partitioner(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_describe_partitioner(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = describe_partitioner_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.success != None:
- return result.success
- raise TApplicationException(TApplicationException.MISSING_RESULT, "describe_partitioner failed: unknown result");
+ return d.callback(result.success)
+ return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, "describe_partitioner failed: unknown result"))
def describe_keyspace(self, keyspace):
"""
@@ -1043,32 +1105,35 @@ def describe_keyspace(self, keyspace):
Parameters:
- keyspace
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_describe_keyspace(keyspace)
- return self.recv_describe_keyspace()
+ return d
def send_describe_keyspace(self, keyspace):
- self._oprot.writeMessageBegin('describe_keyspace', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('describe_keyspace', TMessageType.CALL, self._seqid)
args = describe_keyspace_args()
args.keyspace = keyspace
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_describe_keyspace(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_describe_keyspace(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = describe_keyspace_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.success != None:
- return result.success
+ return d.callback(result.success)
if result.nfe != None:
- raise result.nfe
- raise TApplicationException(TApplicationException.MISSING_RESULT, "describe_keyspace failed: unknown result");
+ return d.errback(result.nfe)
+ return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, "describe_keyspace failed: unknown result"))
def describe_splits(self, keyspace, cfName, start_token, end_token, keys_per_split):
"""
@@ -1085,34 +1150,37 @@ def describe_splits(self, keyspace, cfName, start_token, end_token, keys_per_spl
- end_token
- keys_per_split
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_describe_splits(keyspace, cfName, start_token, end_token, keys_per_split)
- return self.recv_describe_splits()
+ return d
def send_describe_splits(self, keyspace, cfName, start_token, end_token, keys_per_split):
- self._oprot.writeMessageBegin('describe_splits', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('describe_splits', TMessageType.CALL, self._seqid)
args = describe_splits_args()
args.keyspace = keyspace
args.cfName = cfName
args.start_token = start_token
args.end_token = end_token
args.keys_per_split = keys_per_split
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_describe_splits(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_describe_splits(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = describe_splits_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.success != None:
- return result.success
- raise TApplicationException(TApplicationException.MISSING_RESULT, "describe_splits failed: unknown result");
+ return d.callback(result.success)
+ return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, "describe_splits failed: unknown result"))
def system_add_column_family(self, cf_def):
"""
@@ -1121,32 +1189,35 @@ def system_add_column_family(self, cf_def):
Parameters:
- cf_def
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_system_add_column_family(cf_def)
- return self.recv_system_add_column_family()
+ return d
def send_system_add_column_family(self, cf_def):
- self._oprot.writeMessageBegin('system_add_column_family', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('system_add_column_family', TMessageType.CALL, self._seqid)
args = system_add_column_family_args()
args.cf_def = cf_def
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_system_add_column_family(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_system_add_column_family(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = system_add_column_family_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.success != None:
- return result.success
+ return d.callback(result.success)
if result.ire != None:
- raise result.ire
- raise TApplicationException(TApplicationException.MISSING_RESULT, "system_add_column_family failed: unknown result");
+ return d.errback(result.ire)
+ return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, "system_add_column_family failed: unknown result"))
def system_drop_column_family(self, column_family):
"""
@@ -1155,32 +1226,35 @@ def system_drop_column_family(self, column_family):
Parameters:
- column_family
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_system_drop_column_family(column_family)
- return self.recv_system_drop_column_family()
+ return d
def send_system_drop_column_family(self, column_family):
- self._oprot.writeMessageBegin('system_drop_column_family', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('system_drop_column_family', TMessageType.CALL, self._seqid)
args = system_drop_column_family_args()
args.column_family = column_family
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_system_drop_column_family(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_system_drop_column_family(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = system_drop_column_family_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.success != None:
- return result.success
+ return d.callback(result.success)
if result.ire != None:
- raise result.ire
- raise TApplicationException(TApplicationException.MISSING_RESULT, "system_drop_column_family failed: unknown result");
+ return d.errback(result.ire)
+ return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, "system_drop_column_family failed: unknown result"))
def system_rename_column_family(self, old_name, new_name):
"""
@@ -1190,33 +1264,36 @@ def system_rename_column_family(self, old_name, new_name):
- old_name
- new_name
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_system_rename_column_family(old_name, new_name)
- return self.recv_system_rename_column_family()
+ return d
def send_system_rename_column_family(self, old_name, new_name):
- self._oprot.writeMessageBegin('system_rename_column_family', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('system_rename_column_family', TMessageType.CALL, self._seqid)
args = system_rename_column_family_args()
args.old_name = old_name
args.new_name = new_name
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_system_rename_column_family(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_system_rename_column_family(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = system_rename_column_family_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.success != None:
- return result.success
+ return d.callback(result.success)
if result.ire != None:
- raise result.ire
- raise TApplicationException(TApplicationException.MISSING_RESULT, "system_rename_column_family failed: unknown result");
+ return d.errback(result.ire)
+ return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, "system_rename_column_family failed: unknown result"))
def system_add_keyspace(self, ks_def):
"""
@@ -1225,32 +1302,35 @@ def system_add_keyspace(self, ks_def):
Parameters:
- ks_def
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_system_add_keyspace(ks_def)
- return self.recv_system_add_keyspace()
+ return d
def send_system_add_keyspace(self, ks_def):
- self._oprot.writeMessageBegin('system_add_keyspace', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('system_add_keyspace', TMessageType.CALL, self._seqid)
args = system_add_keyspace_args()
args.ks_def = ks_def
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_system_add_keyspace(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_system_add_keyspace(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = system_add_keyspace_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.success != None:
- return result.success
+ return d.callback(result.success)
if result.ire != None:
- raise result.ire
- raise TApplicationException(TApplicationException.MISSING_RESULT, "system_add_keyspace failed: unknown result");
+ return d.errback(result.ire)
+ return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, "system_add_keyspace failed: unknown result"))
def system_drop_keyspace(self, keyspace):
"""
@@ -1259,32 +1339,35 @@ def system_drop_keyspace(self, keyspace):
Parameters:
- keyspace
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_system_drop_keyspace(keyspace)
- return self.recv_system_drop_keyspace()
+ return d
def send_system_drop_keyspace(self, keyspace):
- self._oprot.writeMessageBegin('system_drop_keyspace', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('system_drop_keyspace', TMessageType.CALL, self._seqid)
args = system_drop_keyspace_args()
args.keyspace = keyspace
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_system_drop_keyspace(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_system_drop_keyspace(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = system_drop_keyspace_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.success != None:
- return result.success
+ return d.callback(result.success)
if result.ire != None:
- raise result.ire
- raise TApplicationException(TApplicationException.MISSING_RESULT, "system_drop_keyspace failed: unknown result");
+ return d.errback(result.ire)
+ return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, "system_drop_keyspace failed: unknown result"))
def system_rename_keyspace(self, old_name, new_name):
"""
@@ -1294,38 +1377,43 @@ def system_rename_keyspace(self, old_name, new_name):
- old_name
- new_name
"""
+ self._seqid += 1
+ d = self._reqs[self._seqid] = defer.Deferred()
self.send_system_rename_keyspace(old_name, new_name)
- return self.recv_system_rename_keyspace()
+ return d
def send_system_rename_keyspace(self, old_name, new_name):
- self._oprot.writeMessageBegin('system_rename_keyspace', TMessageType.CALL, self._seqid)
+ oprot = self._oprot_factory.getProtocol(self._transport)
+ oprot.writeMessageBegin('system_rename_keyspace', TMessageType.CALL, self._seqid)
args = system_rename_keyspace_args()
args.old_name = old_name
args.new_name = new_name
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
+ args.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
- def recv_system_rename_keyspace(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
+ def recv_system_rename_keyspace(self, iprot, mtype, rseqid):
+ d = self._reqs.pop(rseqid)
if mtype == TMessageType.EXCEPTION:
x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
+ x.read(iprot)
+ iprot.readMessageEnd()
+ return d.errback(x)
result = system_rename_keyspace_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
+ result.read(iprot)
+ iprot.readMessageEnd()
if result.success != None:
- return result.success
+ return d.callback(result.success)
if result.ire != None:
- raise result.ire
- raise TApplicationException(TApplicationException.MISSING_RESULT, "system_rename_keyspace failed: unknown result");
+ return d.errback(result.ire)
+ return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, "system_rename_keyspace failed: unknown result"))
-class Processor(Iface, TProcessor):
+class Processor(TProcessor):
+ implements(Iface)
+
def __init__(self, handler):
- self._handler = handler
+ self._handler = Iface(handler)
self._processMap = {}
self._processMap["login"] = Processor.process_login
self._processMap["set_keyspace"] = Processor.process_set_keyspace
@@ -1365,18 +1453,30 @@ def process(self, iprot, oprot):
x.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
- return
+ return defer.succeed(None)
else:
- self._processMap[name](self, seqid, iprot, oprot)
- return True
+ return self._processMap[name](self, seqid, iprot, oprot)
def process_login(self, seqid, iprot, oprot):
args = login_args()
args.read(iprot)
iprot.readMessageEnd()
result = login_result()
+ d = defer.maybeDeferred(self._handler.login, args.auth_request)
+ d.addCallback(self.write_results_success_login, result, seqid, oprot)
+ d.addErrback(self.write_results_exception_login, result, seqid, oprot)
+ return d
+
+ def write_results_success_login(self, success, result, seqid, oprot):
+ result.success = success
+ oprot.writeMessageBegin("login", TMessageType.REPLY, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
+ def write_results_exception_login(self, error, result, seqid, oprot):
try:
- result.success = self._handler.login(args.auth_request)
+ error.raiseException()
except AuthenticationException, authnx:
result.authnx = authnx
except AuthorizationException, authzx:
@@ -1391,8 +1491,21 @@ def process_set_keyspace(self, seqid, iprot, oprot):
args.read(iprot)
iprot.readMessageEnd()
result = set_keyspace_result()
+ d = defer.maybeDeferred(self._handler.set_keyspace, args.keyspace)
+ d.addCallback(self.write_results_success_set_keyspace, result, seqid, oprot)
+ d.addErrback(self.write_results_exception_set_keyspace, result, seqid, oprot)
+ return d
+
+ def write_results_success_set_keyspace(self, success, result, seqid, oprot):
+ result.success = success
+ oprot.writeMessageBegin("set_keyspace", TMessageType.REPLY, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
+ def write_results_exception_set_keyspace(self, error, result, seqid, oprot):
try:
- self._handler.set_keyspace(args.keyspace)
+ error.raiseException()
except InvalidRequestException, ire:
result.ire = ire
oprot.writeMessageBegin("set_keyspace", TMessageType.REPLY, seqid)
@@ -1405,8 +1518,21 @@ def process_get(self, seqid, iprot, oprot):
args.read(iprot)
iprot.readMessageEnd()
result = get_result()
+ d = defer.maybeDeferred(self._handler.get, args.key, args.column_path, args.consistency_level)
+ d.addCallback(self.write_results_success_get, result, seqid, oprot)
+ d.addErrback(self.write_results_exception_get, result, seqid, oprot)
+ return d
+
+ def write_results_success_get(self, success, result, seqid, oprot):
+ result.success = success
+ oprot.writeMessageBegin("get", TMessageType.REPLY, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
+ def write_results_exception_get(self, error, result, seqid, oprot):
try:
- result.success = self._handler.get(args.key, args.column_path, args.consistency_level)
+ error.raiseException()
except InvalidRequestException, ire:
result.ire = ire
except NotFoundException, nfe:
@@ -1425,8 +1551,21 @@ def process_get_slice(self, seqid, iprot, oprot):
args.read(iprot)
iprot.readMessageEnd()
result = get_slice_result()
+ d = defer.maybeDeferred(self._handler.get_slice, args.key, args.column_parent, args.predicate, args.consistency_level)
+ d.addCallback(self.write_results_success_get_slice, result, seqid, oprot)
+ d.addErrback(self.write_results_exception_get_slice, result, seqid, oprot)
+ return d
+
+ def write_results_success_get_slice(self, success, result, seqid, oprot):
+ result.success = success
+ oprot.writeMessageBegin("get_slice", TMessageType.REPLY, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
+ def write_results_exception_get_slice(self, error, result, seqid, oprot):
try:
- result.success = self._handler.get_slice(args.key, args.column_parent, args.predicate, args.consistency_level)
+ error.raiseException()
except InvalidRequestException, ire:
result.ire = ire
except UnavailableException, ue:
@@ -1443,8 +1582,21 @@ def process_get_count(self, seqid, iprot, oprot):
args.read(iprot)
iprot.readMessageEnd()
result = get_count_result()
+ d = defer.maybeDeferred(self._handler.get_count, args.key, args.column_parent, args.predicate, args.consistency_level)
+ d.addCallback(self.write_results_success_get_count, result, seqid, oprot)
+ d.addErrback(self.write_results_exception_get_count, result, seqid, oprot)
+ return d
+
+ def write_results_success_get_count(self, success, result, seqid, oprot):
+ result.success = success
+ oprot.writeMessageBegin("get_count", TMessageType.REPLY, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
+ def write_results_exception_get_count(self, error, result, seqid, oprot):
try:
- result.success = self._handler.get_count(args.key, args.column_parent, args.predicate, args.consistency_level)
+ error.raiseException()
except InvalidRequestException, ire:
result.ire = ire
except UnavailableException, ue:
@@ -1461,8 +1613,21 @@ def process_multiget_slice(self, seqid, iprot, oprot):
args.read(iprot)
iprot.readMessageEnd()
result = multiget_slice_result()
+ d = defer.maybeDeferred(self._handler.multiget_slice, args.keys, args.column_parent, args.predicate, args.consistency_level)
+ d.addCallback(self.write_results_success_multiget_slice, result, seqid, oprot)
+ d.addErrback(self.write_results_exception_multiget_slice, result, seqid, oprot)
+ return d
+
+ def write_results_success_multiget_slice(self, success, result, seqid, oprot):
+ result.success = success
+ oprot.writeMessageBegin("multiget_slice", TMessageType.REPLY, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
+ def write_results_exception_multiget_slice(self, error, result, seqid, oprot):
try:
- result.success = self._handler.multiget_slice(args.keys, args.column_parent, args.predicate, args.consistency_level)
+ error.raiseException()
except InvalidRequestException, ire:
result.ire = ire
except UnavailableException, ue:
@@ -1479,8 +1644,21 @@ def process_multiget_count(self, seqid, iprot, oprot):
args.read(iprot)
iprot.readMessageEnd()
result = multiget_count_result()
+ d = defer.maybeDeferred(self._handler.multiget_count, args.keyspace, args.keys, args.column_parent, args.predicate, args.consistency_level)
+ d.addCallback(self.write_results_success_multiget_count, result, seqid, oprot)
+ d.addErrback(self.write_results_exception_multiget_count, result, seqid, oprot)
+ return d
+
+ def write_results_success_multiget_count(self, success, result, seqid, oprot):
+ result.success = success
+ oprot.writeMessageBegin("multiget_count", TMessageType.REPLY, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
+ def write_results_exception_multiget_count(self, error, result, seqid, oprot):
try:
- result.success = self._handler.multiget_count(args.keyspace, args.keys, args.column_parent, args.predicate, args.consistency_level)
+ error.raiseException()
except InvalidRequestException, ire:
result.ire = ire
except UnavailableException, ue:
@@ -1497,8 +1675,21 @@ def process_get_range_slices(self, seqid, iprot, oprot):
args.read(iprot)
iprot.readMessageEnd()
result = get_range_slices_result()
+ d = defer.maybeDeferred(self._handler.get_range_slices, args.column_parent, args.predicate, args.range, args.consistency_level)
+ d.addCallback(self.write_results_success_get_range_slices, result, seqid, oprot)
+ d.addErrback(self.write_results_exception_get_range_slices, result, seqid, oprot)
+ return d
+
+ def write_results_success_get_range_slices(self, success, result, seqid, oprot):
+ result.success = success
+ oprot.writeMessageBegin("get_range_slices", TMessageType.REPLY, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
+ def write_results_exception_get_range_slices(self, error, result, seqid, oprot):
try:
- result.success = self._handler.get_range_slices(args.column_parent, args.predicate, args.range, args.consistency_level)
+ error.raiseException()
except InvalidRequestException, ire:
result.ire = ire
except UnavailableException, ue:
@@ -1515,8 +1706,21 @@ def process_get_indexed_slices(self, seqid, iprot, oprot):
args.read(iprot)
iprot.readMessageEnd()
result = get_indexed_slices_result()
+ d = defer.maybeDeferred(self._handler.get_indexed_slices, args.column_parent, args.index_clause, args.column_predicate, args.consistency_level)
+ d.addCallback(self.write_results_success_get_indexed_slices, result, seqid, oprot)
+ d.addErrback(self.write_results_exception_get_indexed_slices, result, seqid, oprot)
+ return d
+
+ def write_results_success_get_indexed_slices(self, success, result, seqid, oprot):
+ result.success = success
+ oprot.writeMessageBegin("get_indexed_slices", TMessageType.REPLY, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
+ def write_results_exception_get_indexed_slices(self, error, result, seqid, oprot):
try:
- result.success = self._handler.get_indexed_slices(args.column_parent, args.index_clause, args.column_predicate, args.consistency_level)
+ error.raiseException()
except InvalidRequestException, ire:
result.ire = ire
except UnavailableException, ue:
@@ -1533,8 +1737,21 @@ def process_insert(self, seqid, iprot, oprot):
args.read(iprot)
iprot.readMessageEnd()
result = insert_result()
+ d = defer.maybeDeferred(self._handler.insert, args.key, args.column_parent, args.column, args.consistency_level)
+ d.addCallback(self.write_results_success_insert, result, seqid, oprot)
+ d.addErrback(self.write_results_exception_insert, result, seqid, oprot)
+ return d
+
+ def write_results_success_insert(self, success, result, seqid, oprot):
+ result.success = success
+ oprot.writeMessageBegin("insert", TMessageType.REPLY, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
+ def write_results_exception_insert(self, error, result, seqid, oprot):
try:
- self._handler.insert(args.key, args.column_parent, args.column, args.consistency_level)
+ error.raiseException()
except InvalidRequestException, ire:
result.ire = ire
except UnavailableException, ue:
@@ -1551,8 +1768,21 @@ def process_remove(self, seqid, iprot, oprot):
args.read(iprot)
iprot.readMessageEnd()
result = remove_result()
+ d = defer.maybeDeferred(self._handler.remove, args.key, args.column_path, args.clock, args.consistency_level)
+ d.addCallback(self.write_results_success_remove, result, seqid, oprot)
+ d.addErrback(self.write_results_exception_remove, result, seqid, oprot)
+ return d
+
+ def write_results_success_remove(self, success, result, seqid, oprot):
+ result.success = success
+ oprot.writeMessageBegin("remove", TMessageType.REPLY, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
+ def write_results_exception_remove(self, error, result, seqid, oprot):
try:
- self._handler.remove(args.key, args.column_path, args.clock, args.consistency_level)
+ error.raiseException()
except InvalidRequestException, ire:
result.ire = ire
except UnavailableException, ue:
@@ -1569,8 +1799,21 @@ def process_batch_mutate(self, seqid, iprot, oprot):
args.read(iprot)
iprot.readMessageEnd()
result = batch_mutate_result()
+ d = defer.maybeDeferred(self._handler.batch_mutate, args.mutation_map, args.consistency_level)
+ d.addCallback(self.write_results_success_batch_mutate, result, seqid, oprot)
+ d.addErrback(self.write_results_exception_batch_mutate, result, seqid, oprot)
+ return d
+
+ def write_results_success_batch_mutate(self, success, result, seqid, oprot):
+ result.success = success
+ oprot.writeMessageBegin("batch_mutate", TMessageType.REPLY, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
+ def write_results_exception_batch_mutate(self, error, result, seqid, oprot):
try:
- self._handler.batch_mutate(args.mutation_map, args.consistency_level)
+ error.raiseException()
except InvalidRequestException, ire:
result.ire = ire
except UnavailableException, ue:
@@ -1587,8 +1830,21 @@ def process_truncate(self, seqid, iprot, oprot):
args.read(iprot)
iprot.readMessageEnd()
result = truncate_result()
+ d = defer.maybeDeferred(self._handler.truncate, args.cfname)
+ d.addCallback(self.write_results_success_truncate, result, seqid, oprot)
+ d.addErrback(self.write_results_exception_truncate, result, seqid, oprot)
+ return d
+
+ def write_results_success_truncate(self, success, result, seqid, oprot):
+ result.success = success
+ oprot.writeMessageBegin("truncate", TMessageType.REPLY, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
+ def write_results_exception_truncate(self, error, result, seqid, oprot):
try:
- self._handler.truncate(args.cfname)
+ error.raiseException()
except InvalidRequestException, ire:
result.ire = ire
except UnavailableException, ue:
@@ -1603,8 +1859,21 @@ def process_check_schema_agreement(self, seqid, iprot, oprot):
args.read(iprot)
iprot.readMessageEnd()
result = check_schema_agreement_result()
+ d = defer.maybeDeferred(self._handler.check_schema_agreement, )
+ d.addCallback(self.write_results_success_check_schema_agreement, result, seqid, oprot)
+ d.addErrback(self.write_results_exception_check_schema_agreement, result, seqid, oprot)
+ return d
+
+ def write_results_success_check_schema_agreement(self, success, result, seqid, oprot):
+ result.success = success
+ oprot.writeMessageBegin("check_schema_agreement", TMessageType.REPLY, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
+ def write_results_exception_check_schema_agreement(self, error, result, seqid, oprot):
try:
- result.success = self._handler.check_schema_agreement()
+ error.raiseException()
except InvalidRequestException, ire:
result.ire = ire
oprot.writeMessageBegin("check_schema_agreement", TMessageType.REPLY, seqid)
@@ -1617,7 +1886,12 @@ def process_describe_keyspaces(self, seqid, iprot, oprot):
args.read(iprot)
iprot.readMessageEnd()
result = describe_keyspaces_result()
- result.success = self._handler.describe_keyspaces()
+ d = defer.maybeDeferred(self._handler.describe_keyspaces, )
+ d.addCallback(self.write_results_success_describe_keyspaces, result, seqid, oprot)
+ return d
+
+ def write_results_success_describe_keyspaces(self, success, result, seqid, oprot):
+ result.success = success
oprot.writeMessageBegin("describe_keyspaces", TMessageType.REPLY, seqid)
result.write(oprot)
oprot.writeMessageEnd()
@@ -1628,7 +1902,12 @@ def process_describe_cluster_name(self, seqid, iprot, oprot):
args.read(iprot)
iprot.readMessageEnd()
result = describe_cluster_name_result()
- result.success = self._handler.describe_cluster_name()
+ d = defer.maybeDeferred(self._handler.describe_cluster_name, )
+ d.addCallback(self.write_results_success_describe_cluster_name, result, seqid, oprot)
+ return d
+
+ def write_results_success_describe_cluster_name(self, success, result, seqid, oprot):
+ result.success = success
oprot.writeMessageBegin("describe_cluster_name", TMessageType.REPLY, seqid)
result.write(oprot)
oprot.writeMessageEnd()
@@ -1639,7 +1918,12 @@ def process_describe_version(self, seqid, iprot, o