Browse files

Whitespace nazi

  • Loading branch information...
1 parent 9b54850 commit 947e6b982637c06ba7bc60ab26a0084a040e687c @thobbs thobbs committed Apr 13, 2011
Showing with 47 additions and 48 deletions.
  1. +37 −37 telephus/client.py
  2. +10 −11 telephus/protocol.py
View
74 telephus/client.py
@@ -21,22 +21,22 @@ class CassandraClient(object):
def __init__(self, manager, consistency=ConsistencyLevel.ONE):
self.manager = manager
self.consistency = consistency
-
+
def _time(self):
return int(time.time() * 1000000)
-
+
def _getparent(self, columnParentOrCF, super_column=None):
if isinstance(columnParentOrCF, str):
return ColumnParent(columnParentOrCF, super_column=super_column)
else:
return columnParentOrCF
-
+
def _getpath(self, columnPathOrCF, col, super_column=None):
if isinstance(columnPathOrCF, str):
return ColumnPath(columnPathOrCF, super_column=super_column, column=col)
else:
return columnPathOrCF
-
+
def _mkpred(self, names, start, finish, reverse, count):
if names:
srange = None
@@ -54,21 +54,21 @@ def _wait_for_schema_agreement(self):
d = defer.Deferred()
reactor.callLater(0.1, d.callback, True)
yield d
-
+
@defer.inlineCallbacks
def _push_system_request(self,req,retries=None,block=True):
result = yield self.manager.pushRequest(req, retries=retries)
if block: yield self._wait_for_schema_agreement()
defer.returnValue(result)
-
+
@requirekwargs('key', 'column_family')
def get(self, key=None, column_family=None, column=None, super_column=None, consistency=None,
retries=None):
cp = self._getpath(column_family, column, super_column)
consistency = consistency or self.consistency
req = ManagedThriftRequest('get', key, cp, consistency)
return self.manager.pushRequest(req, retries=retries)
-
+
@requirekwargs('key', 'column_family')
def get_slice(self, key=None, column_family=None, names=None, start='', finish='',
reverse=False, count=100, consistency=None, super_column=None,
@@ -78,12 +78,12 @@ def get_slice(self, key=None, column_family=None, names=None, start='', finish='
pred = self._mkpred(names, start, finish, reverse, count)
req = ManagedThriftRequest('get_slice', key, cp, pred, consistency)
return self.manager.pushRequest(req, retries=retries)
-
+
def multiget(self, keys=None, column_family=None, column=None, super_column=None,
consistency=None, retries=None):
return self.multiget_slice(keys, column_family, names=[column], count=100,
consistency=consistency, retries=retries, super_column=super_column)
-
+
@requirekwargs('keys', 'column_family')
def multiget_slice(self, keys=None, column_family=None, names=None, start='', finish='',
reverse=False, count=100, consistency=None, super_column=None,
@@ -93,31 +93,31 @@ def multiget_slice(self, keys=None, column_family=None, names=None, start='', fi
pred = self._mkpred(names, start, finish, reverse, count)
req = ManagedThriftRequest('multiget_slice', keys, cp, pred, consistency)
return self.manager.pushRequest(req, retries=retries)
-
+
@requirekwargs('keys', 'column_family')
def multiget_count(self, keys=None, column_family=None, super_column=None, start='', finish='',
- consistency=None, retries=None):
+ consistency=None, retries=None):
cp = self._getparent(column_family, super_column)
pred = self._mkpred(None, start, finish, False, 2147483647)
consistency = consistency or self.consistency
req = ManagedThriftRequest('multiget_count', keys, cp, pred, consistency)
return self.manager.pushRequest(req, retries=retries)
-
+
@requirekwargs('key', 'column_family')
def get_count(self, key=None, column_family=None, super_column=None, start='', finish='',
- consistency=None, retries=None):
+ consistency=None, retries=None):
cp = self._getparent(column_family, super_column)
pred = self._mkpred(None, start, finish, False, 2147483647)
consistency = consistency or self.consistency
req = ManagedThriftRequest('get_count', key, cp, pred, consistency)
return self.manager.pushRequest(req, retries=retries)
-
+
def get_key_range(self, columnParent, **kwargs):
return self.get_range_slices(columnParent, **kwargs)
-
+
def get_range_slice(self, columnParent, **kwargs):
return self.get_range_slices(columnParent, **kwargs)
-
+
@requirekwargs('column_family')
def get_range_slices(self, column_family=None, start='', finish='', column_start='',
column_finish='', names=None, count=100, column_count=100,
@@ -155,7 +155,7 @@ def insert(self, key=None, column_family=None, value=None, column=None, super_co
return self.manager.pushRequest(req, retries=retries)
@requirekwargs('key', 'column_family')
- def remove(self, key=None, column_family=None, column=None, super_column=None,
+ def remove(self, key=None, column_family=None, column=None, super_column=None,
timestamp=None, consistency=None, retries=None):
cp = self._getpath(column_family, column, super_column)
timestamp = timestamp or self._time()
@@ -173,7 +173,7 @@ def batch_insert(self, key=None, column_family=None, mapping=None, timestamp=Non
mutmap = {key: {column_family: self._mk_cols_or_supers(mapping, timestamp, ttl)}}
return self.batch_mutate(mutmap, timestamp=timestamp, consistency=consistency,
retries=retries)
-
+
@requirekwargs('cfmap')
def batch_remove(self, cfmap=None, start='', finish='', count=100, names=None,
reverse=False, consistency=None, timestamp=None, supercolumn=None,
@@ -187,7 +187,7 @@ def batch_remove(self, cfmap=None, start='', finish='', count=100, names=None,
mutmap[key][cf] = [Mutation(deletion=Deletion(timestamp, supercolumn, pred))]
req = ManagedThriftRequest('batch_mutate', mutmap, consistency)
return self.manager.pushRequest(req, retries=retries)
-
+
@requirekwargs('mutationmap')
def batch_mutate(self, mutationmap=None, timestamp=None, consistency=None, retries=None, ttl=None):
timestamp = timestamp or self._time()
@@ -209,7 +209,7 @@ def batch_mutate(self, mutationmap=None, timestamp=None, consistency=None, retri
mutmap[key][cf] = muts
req = ManagedThriftRequest('batch_mutate', mutmap, consistency)
return self.manager.pushRequest(req, retries=retries)
-
+
def _mk_cols_or_supers(self, mapping, timestamp, ttl=None, make_deletions=False):
if isinstance(mapping, list):
return mapping
@@ -234,59 +234,59 @@ def _mk_cols_or_supers(self, mapping, timestamp, ttl=None, make_deletions=False)
else:
raise TypeError('dict (of dicts) or list of Columns/SuperColumns expected')
return colsorsupers
-
+
def set_keyspace(self, keyspace):
return self.manager.set_keyspace(keyspace)
-
+
def login(self, credentials):
return self.manager.login(credentials)
-
+
def describe_keyspaces(self, retries=None):
req = ManagedThriftRequest('describe_keyspaces')
return self.manager.pushRequest(req, retries=retries)
-
+
def describe_keyspace(self, keyspace, retries=None):
req = ManagedThriftRequest('describe_keyspace', keyspace)
return self.manager.pushRequest(req, retries=retries)
def describe_cluster_name(self, retries=None):
req = ManagedThriftRequest('describe_cluster_name')
return self.manager.pushRequest(req, retries=retries)
-
+
def describe_partitioner(self, retries=None):
req = ManagedThriftRequest('describe_partitioner')
return self.manager.pushRequest(req, retries=retries)
-
+
def describe_snitch(self, retries=None):
req = ManagedThriftRequest('describe_snitch')
return self.manager.pushRequest(req, retries=retries)
-
+
def describe_ring(self, keyspace, retries=None):
req = ManagedThriftRequest('describe_ring', keyspace)
return self.manager.pushRequest(req, retries=retries)
-
+
def describe_splits(self, cfName, start_token, end_token, keys_per_split,
retries=None):
req = ManagedThriftRequest('describe_splits', cfName, start_token,
end_token, keys_per_split)
return self.manager.pushRequest(req, retries=retries)
-
+
def truncate(self, cfName, retries=None):
req = ManagedThriftRequest('truncate', cfName)
return self.manager.pushRequest(req, retries=retries)
-
+
def describe_schema_versions(self, retries=None):
req = ManagedThriftRequest('describe_schema_versions')
return self.manager.pushRequest(req, retries=retries)
-
+
def system_drop_column_family(self, cfName, retries=None, block=True):
req = ManagedThriftRequest('system_drop_column_family', cfName)
return self._push_system_request(req,retries=retries,block=block)
-
+
def system_drop_keyspace(self, keyspace, retries=None, block=True):
req = ManagedThriftRequest('system_drop_keyspace', keyspace)
return self._push_system_request(req,retries=retries,block=block)
-
+
if 0:
# these are disabled in Cassandra 0.7 right now
def system_rename_column_family(self, oldname, newname, retries=None, block=True):
@@ -297,20 +297,20 @@ def system_rename_column_family(self, oldname, newname, retries=None, block=True
def system_rename_keyspace(self, oldname, newname, retries=None, block=True):
req = ManagedThriftRequest('system_rename_keyspace', oldname, newname)
return self._push_system_request(req,retries=retries,block=block)
-
+
# TODO: make friendly
def system_add_column_family(self, cfDef, retries=None, block=True):
req = ManagedThriftRequest('system_add_column_family', cfDef)
return self._push_system_request(req,retries=retries,block=block)
-
+
def system_update_column_family(self, cfDef, retries=None, block=True):
req = ManagedThriftRequest('system_update_column_family', cfDef)
return self._push_system_request(req,retries=retries,block=block)
-
+
def system_add_keyspace(self, ksDef, retries=None, block=True):
req = ManagedThriftRequest('system_add_keyspace', ksDef)
return self._push_system_request(req,retries=retries,block=block)
-
+
def system_update_keyspace(self, ksDef, retries=None, block=True):
req = ManagedThriftRequest('system_update_keyspace', ksDef)
return self._push_system_request(req,retries=retries,block=block)
View
21 telephus/protocol.py
@@ -19,7 +19,7 @@ class APIMismatch(Exception):
class ManagedThriftRequest(object):
def __init__(self, method, *args):
- self.method = method
+ self.method = method
self.args = args
def match_thrift_version(ourversion, remoteversion):
@@ -80,12 +80,12 @@ def connectionLost(self, reason=None):
# exceptions, the manager handled our failure
TTwisted.ThriftClientProtocol.connectionLost(self, reason)
self.factory.clientGone(self)
-
+
def _complete(self, res=None):
self.deferred = None
self.factory.clientIdle(self)
return res
-
+
def submitRequest(self, request):
if not self.deferred:
fun = getattr(self.client, request.method, None)
@@ -98,11 +98,11 @@ def submitRequest(self, request):
return d
else:
raise ClientBusy
-
+
def abort(self):
self.aborted = True
self.transport.loseConnection()
-
+
class AuthenticatedThriftClientProtocol(ManagedThriftClientProtocol):
def __init__(self, client_class, keyspace, credentials, iprot_factory, oprot_factory=None):
ManagedThriftClientProtocol.__init__(self, client_class, iprot_factory, oprot_factory, keyspace=keyspace)
@@ -175,23 +175,23 @@ def clientGone(self, proto):
self._protos.remove(proto)
except ValueError:
pass
-
+
def set_keyspace(self, keyspace):
""" switch all connections to another keyspace """
self.keyspace = keyspace
dfrds = []
for p in self._protos:
dfrds.append(p.submitRequest(ManagedThriftRequest('set_keyspace', keyspace)))
return defer.gatherResults(dfrds)
-
+
def login(self, credentials):
""" authenticate all connections """
dfrds = []
for p in self._protos:
dfrds.append(p.submitRequest(ManagedThriftRequest('login',
AuthenticationRequest(credentials=credentials))))
return defer.gatherResults(dfrds)
-
+
def submitRequest(self, proto):
def reqError(err, req, d, r):
if err.check(InvalidRequestException, InvalidThriftRequest) or r < 1:
@@ -227,19 +227,18 @@ def _process((request, deferred, retries)):
callbackArgs=[deferred],
errbackArgs=[request, deferred, retries])
return self.queue.get().addCallback(_process)
-
+
def pushRequest(self, request, retries=None):
retries = retries or self.request_retries
d = defer.Deferred()
self._pending.append(d)
self.queue.put((request, d, retries))
return d
-
+
def shutdown(self):
self.stopTrying()
for p in self._protos:
if p.transport:
p.transport.loseConnection()
for d in self._pending:
if not d.called: d.errback(UserError(string="Shutdown requested"))
-

0 comments on commit 947e6b9

Please sign in to comment.