Skip to content

Commit

Permalink
Implemented HappyBase Batch.put().
Browse files Browse the repository at this point in the history
  • Loading branch information
dhermes committed Sep 10, 2015
1 parent ee72395 commit c43dbb1
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 7 deletions.
33 changes: 30 additions & 3 deletions gcloud_bigtable/happybase/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@


import datetime
import six

from gcloud_bigtable._helpers import _microseconds_to_timestamp
from gcloud_bigtable.row import TimestampRange
Expand All @@ -27,7 +28,7 @@
_ONE_MILLISECOND = datetime.timedelta(microseconds=1000)


def _get_column_pairs(columns):
def _get_column_pairs(columns, require_qualifier=False):
"""Turns a list of column or column families in parsed pairs.
Turns a column family (``fam`` or ``fam:``) into a pair such
Expand All @@ -41,12 +42,19 @@ def _get_column_pairs(columns):
* an entire column family: ``fam`` or ``fam:``
* an single column: ``fam:col``
:type require_qualifier: bool
:param require_qualifier: Boolean indicating if the columns should
all have a qualifier or not.
:rtype: list
:returns: List of pairs, where the first element in each pair is the
column family and the second is the column qualifier
(or :data:`None`).
:raises: :class:`ValueError <exceptions.ValueError>` if any of the columns
are not of the expected format.
:class:`ValueError <exceptions.ValueError>` if
``require_qualifier`` is :data:`True` and one of the values is
for an entire column family
"""
column_pairs = []
for column in columns:
Expand All @@ -55,7 +63,11 @@ def _get_column_pairs(columns):
num_colons = column.count(':')
if num_colons == 0:
# column is a column family.
column_pairs.append([column, None])
if require_qualifier:
raise ValueError('column does not contain a qualifier',
column)
else:
column_pairs.append([column, None])
elif num_colons == 1:
column_pairs.append(column.split(':'))
else:
Expand Down Expand Up @@ -184,7 +196,22 @@ def put(self, row, data, wal=_WAL_SENTINEL):
:raises: :class:`ValueError <exceptions.ValueError>` if ``wal``
is used.
"""
raise NotImplementedError('Temporarily not implemented.')
if wal is not _WAL_SENTINEL:
raise ValueError('The wal argument cannot be used with '
'Cloud Bigtable.')

row_object = self._get_row(row)
# Make sure all the keys are valid before beginning
# to add mutations.
column_pairs = _get_column_pairs(six.iterkeys(data),
require_qualifier=True)
for column_family_id, column_qualifier in column_pairs:
value = data[column_family_id + ':' + column_qualifier]
row_object.set_cell(column_family_id, column_qualifier,
value, timestamp=self._timestamp)

self._mutation_count += len(data)
self._try_send()

def _delete_columns(self, columns, row_object):
"""Adds delete mutations for a list of columns and column families.
Expand Down
80 changes: 76 additions & 4 deletions gcloud_bigtable/happybase/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ def send(self):

class Test__get_column_pairs(unittest2.TestCase):

def _callFUT(self, columns):
def _callFUT(self, *args, **kwargs):
from gcloud_bigtable.happybase.batch import _get_column_pairs
return _get_column_pairs(columns)
return _get_column_pairs(*args, **kwargs)

def test_it(self):
columns = ['cf1', 'cf2:', 'cf3::', 'cf3:name1', 'cf3:name2']
Expand Down Expand Up @@ -57,6 +57,11 @@ def test_bad_columns_var(self):
with self.assertRaises(TypeError):
self._callFUT(columns)

def test_column_family_with_require_qualifier(self):
columns = ['a:']
with self.assertRaises(ValueError):
self._callFUT(columns, require_qualifier=True)


class TestBatch(unittest2.TestCase):

Expand Down Expand Up @@ -241,16 +246,79 @@ def make_low_level_table(*args, **kwargs):
# Check how the batch was updated.
self.assertEqual(batch._row_map, {row_key: mock_row})

def test_put(self):
def test_put_bad_wal(self):
from gcloud_bigtable.happybase.batch import _WAL_SENTINEL

table = object()
batch = self._makeOne(table)

row = 'row-key'
data = {}
wal = None
with self.assertRaises(NotImplementedError):

self.assertNotEqual(wal, _WAL_SENTINEL)
with self.assertRaises(ValueError):
batch.put(row, data, wal=wal)

def test_put(self):
import operator

table = object()
batch = self._makeOne(table)
batch._timestamp = timestamp = object()
row_key = 'row-key'
batch._row_map[row_key] = row = _MockRow()

col1_fam = 'cf1'
col1_qual = 'qual1'
value1 = 'value1'
col2_fam = 'cf2'
col2_qual = 'qual2'
value2 = 'value2'
data = {col1_fam + ':' + col1_qual: value1,
col2_fam + ':' + col2_qual: value2}

self.assertEqual(batch._mutation_count, 0)
self.assertEqual(row.set_cell_calls, [])
batch.put(row_key, data)
self.assertEqual(batch._mutation_count, 2)
# Since the calls depend on data.keys(), the order
# is non-deterministic.
first_elt = operator.itemgetter(0)
ordered_calls = sorted(row.set_cell_calls, key=first_elt)

cell1_args = (col1_fam, col1_qual, value1)
cell1_kwargs = {'timestamp': timestamp}
cell2_args = (col2_fam, col2_qual, value2)
cell2_kwargs = {'timestamp': timestamp}
self.assertEqual(ordered_calls, [
(cell1_args, cell1_kwargs),
(cell2_args, cell2_kwargs),
])

def test_put_call_try_send(self):
klass = self._getTargetClass()

class CallTrySend(klass):

try_send_calls = 0

def _try_send(self):
self.try_send_calls += 1

table = object()
batch = CallTrySend(table)

row_key = 'row-key'
batch._row_map[row_key] = _MockRow()

self.assertEqual(batch._mutation_count, 0)
self.assertEqual(batch.try_send_calls, 0)
# No data so that nothing happens
batch.put(row_key, data={})
self.assertEqual(batch._mutation_count, 0)
self.assertEqual(batch.try_send_calls, 1)

def test__delete_columns(self):
table = object()
time_range = object()
Expand Down Expand Up @@ -418,6 +486,7 @@ class _MockRow(object):
def __init__(self):
self.commits = 0
self.deletes = 0
self.set_cell_calls = []
self.delete_cell_calls = []
self.delete_cells_calls = []

Expand All @@ -427,6 +496,9 @@ def commit(self):
def delete(self):
self.deletes += 1

def set_cell(self, *args, **kwargs):
self.set_cell_calls.append((args, kwargs))

def delete_cell(self, *args, **kwargs):
self.delete_cell_calls.append((args, kwargs))

Expand Down

0 comments on commit c43dbb1

Please sign in to comment.