Skip to content

Commit

Permalink
Merge pull request pycassa#215 from rbranson/atomic-batches
Browse files Browse the repository at this point in the history
Support for atomic batches
  • Loading branch information
thobbs committed Oct 3, 2013
2 parents d5f764f + 5224ada commit f52e654
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 9 deletions.
32 changes: 26 additions & 6 deletions pycassa/batch.py
Expand Up @@ -60,6 +60,22 @@
>>> cf.batch().remove('foo').remove('bar').send()
To use atomic batches (supported in Cassandra 1.2 and later), pass the atomic
option in when creating the batch:
.. code-block:: python
>>> cf.batch(atomic=True)
or when sending it:
.. code-block:: python
>>> b = cf.batch()
>>> b.insert('key1', {'col1':'val2'})
>>> b.insert('key2', {'col1':'val2'})
>>> b.send(atomic=True)
"""

import threading
Expand All @@ -75,7 +91,7 @@ class Mutator(object):
is full or `send` is called explicitly.
"""

def __init__(self, pool, queue_size=100, write_consistency_level=None, allow_retries=True):
def __init__(self, pool, queue_size=100, write_consistency_level=None, allow_retries=True, atomic=False):
"""
`pool` is the :class:`~pycassa.pool.ConnectionPool` that will be used
for operations.
Expand All @@ -88,6 +104,7 @@ def __init__(self, pool, queue_size=100, write_consistency_level=None, allow_ret
self.pool = pool
self.limit = queue_size
self.allow_retries = allow_retries
self.atomic = atomic
if write_consistency_level is None:
self.write_consistency_level = ConsistencyLevel.ONE
else:
Expand All @@ -110,10 +127,12 @@ def _enqueue(self, key, column_family, mutations):
self._lock.release()
return self

def send(self, write_consistency_level=None):
def send(self, write_consistency_level=None, atomic=None):
""" Sends all operations currently in the batch and clears the batch. """
if write_consistency_level is None:
write_consistency_level = self.write_consistency_level
if atomic is None:
atomic = self.atomic
mutations = {}
conn = None
self._lock.acquire()
Expand All @@ -122,8 +141,9 @@ def send(self, write_consistency_level=None):
mutations.setdefault(key, {}).setdefault(column_family, []).extend(cols)
if mutations:
conn = self.pool.get()
conn.batch_mutate(mutations, write_consistency_level,
allow_retries=self.allow_retries)
mutatefn = conn.atomic_batch_mutate if atomic else conn.batch_mutate
mutatefn(mutations, write_consistency_level,
allow_retries=self.allow_retries)
self._buffer = []
finally:
if conn:
Expand Down Expand Up @@ -179,13 +199,13 @@ class CfMutator(Mutator):
"""

def __init__(self, column_family, queue_size=100, write_consistency_level=None,
allow_retries=True):
allow_retries=True, atomic=False):
"""
`column_family` is the :class:`~pycassa.columnfamily.ColumnFamily`
that all operations will be executed on.
"""
wcl = write_consistency_level or column_family.write_consistency_level
Mutator.__init__(self, column_family.pool, queue_size, wcl, allow_retries)
Mutator.__init__(self, column_family.pool, queue_size, wcl, allow_retries, atomic)
self._column_family = column_family

def insert(self, key, cols, timestamp=None, ttl=None):
Expand Down
5 changes: 3 additions & 2 deletions pycassa/columnfamily.py
Expand Up @@ -1107,7 +1107,7 @@ def remove_counter(self, key, column, super_column=None, write_consistency_level
self.pool.execute('remove_counter', packed_key, cp,
write_consistency_level or self.write_consistency_level)

def batch(self, queue_size=100, write_consistency_level=None):
def batch(self, queue_size=100, write_consistency_level=None, atomic=None):
"""
Create batch mutator for doing multiple insert, update, and remove
operations using as few roundtrips as possible.
Expand All @@ -1120,7 +1120,8 @@ def batch(self, queue_size=100, write_consistency_level=None):

return CfMutator(self, queue_size,
write_consistency_level or self.write_consistency_level,
allow_retries=self._allow_retries)
allow_retries=self._allow_retries,
atomic=atomic)

def truncate(self):
"""
Expand Down
3 changes: 2 additions & 1 deletion pycassa/pool.py
Expand Up @@ -192,7 +192,8 @@ def __str__(self):

retryable = ('get', 'get_slice', 'multiget_slice', 'get_count', 'multiget_count',
'get_range_slices', 'get_indexed_slices', 'batch_mutate', 'add',
'insert', 'remove', 'remove_counter', 'truncate', 'describe_keyspace')
'insert', 'remove', 'remove_counter', 'truncate', 'describe_keyspace',
'atomic_batch_mutate')
for fname in retryable:
new_f = ConnectionWrapper._retry(getattr(Connection, fname))
setattr(ConnectionWrapper, fname, new_f)
Expand Down
18 changes: 18 additions & 0 deletions tests/test_batch_mutation.py
Expand Up @@ -132,3 +132,21 @@ def test_multi_column_family(self):
batch.send()
assert cf.get('2') == ROWS['2']
assert_raises(NotFoundException, cf.get, '1')

def test_atomic_insert_at_mutator_creation(self):
batch = cf.batch(atomic=True)
for key, cols in ROWS.iteritems():
batch.insert(key, cols)
batch.send()
for key, cols in ROWS.items():
assert cf.get(key) == cols

def test_atomic_insert_at_send(self):
batch = cf.batch(atomic=True)
for key, cols in ROWS.iteritems():
batch.insert(key, cols)
batch.send(atomic=True)
for key, cols in ROWS.items():
assert cf.get(key) == cols


0 comments on commit f52e654

Please sign in to comment.