Skip to content

Commit

Permalink
Implementing HappyBase Table.rows().
Browse files Browse the repository at this point in the history
  • Loading branch information
dhermes committed Sep 18, 2015
1 parent 268af73 commit 6ef5ecb
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 21 deletions.
95 changes: 83 additions & 12 deletions gcloud_bigtable/happybase/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def _filter_chain_helper(column=None, versions=None, timestamp=None,
def _columns_filter_helper(columns):
"""Creates a union filter for a list of columns.
:type columns:
:type columns: list
:param columns: Iterable containing column names (as strings). Each column
name can be either
Expand Down Expand Up @@ -270,6 +270,56 @@ def _columns_filter_helper(columns):
return RowFilterUnion(filters=filters)


def _row_keys_filter_helper(row_keys):
"""Creates a union filter for a list of rows.
:type row_keys: list
:param row_keys: Iterable containing row keys (as strings).
:rtype: :class:`.RowFilterUnion`, :class:`.RowFilter`
:returns: The union filter created containing all of the row keys.
:raises: :class:`ValueError <exceptions.ValueError>` if there are no
filters to union.
"""
filters = []
for row_key in row_keys:
filters.append(RowFilter(row_key_regex_filter=row_key))

num_filters = len(filters)
if num_filters == 0:
raise ValueError('Must have at least one filter.')
elif num_filters == 1:
return filters[0]
else:
return RowFilterUnion(filters=filters)


def _partial_row_to_dict(partial_row_data, include_timestamp=False):
"""Convert a low-level row data object to a dictionary.
Assumes only the latest value in each row are needed (e.g. different
behavior than in :meth:`Table.cells`).
:type partial_row_data: :class:`.row_data.PartialRowData`
:param partial_row_data: Row data consumed from a stream.
:type include_timestamp: bool
:param include_timestamp: Flag to indicate if cell timestamps should be
included with the output.
:rtype: dict
:returns: The row data converted to a dictionary.
"""
result = {}
for column, cells in six.iteritems(partial_row_data.to_dict()):
cell_vals = _cells_to_pairs(cells,
include_timestamp=include_timestamp)
# NOTE: We assume there is exactly 1 version since we used that in
# our filter, but we don't check this.
result[column] = cell_vals[0]
return result


class Table(object):
"""Representation of Cloud Bigtable table.
Expand Down Expand Up @@ -361,14 +411,8 @@ def row(self, row, columns=None, timestamp=None, include_timestamp=False):
if partial_row_data is None:
return {}

result = {}
for column, cells in six.iteritems(partial_row_data.to_dict()):
cell_vals = _cells_to_pairs(cells,
include_timestamp=include_timestamp)
# NOTE: We assume there is exactly 1 version since we used that in
# our filter, but we don't check this.
result[column] = cell_vals[0]
return result
return _partial_row_to_dict(partial_row_data,
include_timestamp=include_timestamp)

def rows(self, rows, columns=None, timestamp=None,
include_timestamp=False):
Expand Down Expand Up @@ -396,10 +440,37 @@ def rows(self, rows, columns=None, timestamp=None,
:param include_timestamp: Flag to indicate if cell timestamps should be
included with the output.
:raises: :class:`NotImplementedError <exceptions.NotImplementedError>`
temporarily until the method is implemented.
:rtype: list
:returns: A list of pairs, where the first is the row key and the
second is a dictionary with the filtered values returned.
"""
raise NotImplementedError('Temporarily not implemented.')
if not rows:
# Avoid round-trip if the result is empty anyway
return []

filters = []
if columns is not None:
filters.append(_columns_filter_helper(columns))
filters.append(_row_keys_filter_helper(rows))
# versions == 1 since we only want the latest.
filter_ = _filter_chain_helper(versions=1, timestamp=timestamp,
filters=filters)

partial_rows_data = self._low_level_table.read_rows(filter_=filter_)
# NOTE: We could use max_loops = 1000 or some similar value to ensure
# that the stream isn't open too long.
partial_rows_data.consume_all()

result = []
for row_key in rows:
if row_key not in partial_rows_data.rows:
continue
curr_row_data = partial_rows_data.rows[row_key]
curr_row_dict = _partial_row_to_dict(
curr_row_data, include_timestamp=include_timestamp)
result.append((row_key, curr_row_dict))

return result

def cells(self, row, column, versions=None, timestamp=None,
include_timestamp=False):
Expand Down
169 changes: 161 additions & 8 deletions gcloud_bigtable/happybase/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,43 @@ def test_column_and_column_familieis(self):
self.assertEqual(filter2b.column_qualifier_regex_filter, col_qual2)


class Test__row_keys_filter_helper(unittest2.TestCase):

def _callFUT(self, *args, **kwargs):
from gcloud_bigtable.happybase.table import _row_keys_filter_helper
return _row_keys_filter_helper(*args, **kwargs)

def test_no_rows(self):
row_keys = []
with self.assertRaises(ValueError):
self._callFUT(row_keys)

def test_single_row(self):
from gcloud_bigtable.row import RowFilter

row_key = b'row-key'
row_keys = [row_key]
result = self._callFUT(row_keys)
expected_result = RowFilter(row_key_regex_filter=row_key)
self.assertEqual(result, expected_result)

def test_many_rows(self):
from gcloud_bigtable.row import RowFilter
from gcloud_bigtable.row import RowFilterUnion

row_key1 = b'row-key1'
row_key2 = b'row-key2'
row_key3 = b'row-key3'
row_keys = [row_key1, row_key2, row_key3]
result = self._callFUT(row_keys)

filter1 = RowFilter(row_key_regex_filter=row_key1)
filter2 = RowFilter(row_key_regex_filter=row_key2)
filter3 = RowFilter(row_key_regex_filter=row_key3)
expected_result = RowFilterUnion(filters=[filter1, filter2, filter3])
self.assertEqual(result, expected_result)


class TestTable(unittest2.TestCase):

def _getTargetClass(self):
Expand Down Expand Up @@ -544,18 +581,118 @@ def test_row_with_results(self):
mock_cells_to_pairs.check_called(
self, [(fake_cells,)], [to_pairs_kwargs])

def test_rows(self):
def test_rows_empty_row(self):
name = 'table-name'
connection = None
table = self._makeOne(name, connection)

row_keys = ['row-key']
columns = ['fam:col1', 'fam:col2']
timestamp = None
include_timestamp = True
with self.assertRaises(NotImplementedError):
table.rows(row_keys, columns=columns, timestamp=timestamp,
include_timestamp=include_timestamp)
result = table.rows([])
self.assertEqual(result, [])

def test_rows_with_columns(self):
from gcloud_bigtable._testing import _MockCalled
from gcloud_bigtable._testing import _Monkey
from gcloud_bigtable.happybase import table as MUT

name = 'table-name'
connection = None
table = self._makeOne(name, connection)
table._low_level_table = _MockLowLevelTable()
rr_result = _MockPartialRowsData()
table._low_level_table.read_rows_result = rr_result
self.assertEqual(rr_result.consume_all_calls, 0)

fake_col_filter = object()
mock_columns_filter_helper = _MockCalled(fake_col_filter)
fake_rows_filter = object()
mock_row_keys_filter_helper = _MockCalled(fake_rows_filter)
fake_filter = object()
mock_filter_chain_helper = _MockCalled(fake_filter)

rows = ['row-key']
columns = object()
with _Monkey(MUT, _filter_chain_helper=mock_filter_chain_helper,
_row_keys_filter_helper=mock_row_keys_filter_helper,
_columns_filter_helper=mock_columns_filter_helper):
result = table.rows(rows, columns=columns)

# read_rows_result == Empty PartialRowsData --> No results.
self.assertEqual(result, [])

read_rows_args = ()
read_rows_kwargs = {'filter_': fake_filter}
self.assertEqual(table._low_level_table.read_rows_calls, [
(read_rows_args, read_rows_kwargs),
])
self.assertEqual(rr_result.consume_all_calls, 1)

mock_columns_filter_helper.check_called(self, [(columns,)])
mock_row_keys_filter_helper.check_called(self, [(rows,)])
expected_kwargs = {
'filters': [fake_col_filter, fake_rows_filter],
'versions': 1,
'timestamp': None,
}
mock_filter_chain_helper.check_called(self, [()], [expected_kwargs])

def test_rows_with_results(self):
from gcloud_bigtable._testing import _MockCalled
from gcloud_bigtable._testing import _Monkey
from gcloud_bigtable.happybase import table as MUT
from gcloud_bigtable.row_data import PartialRowData

row_key1 = 'row-key1'
row_key2 = 'row-key2'
rows = [row_key1, row_key2]
name = 'table-name'
connection = None
table = self._makeOne(name, connection)
table._low_level_table = _MockLowLevelTable()

row1 = PartialRowData(row_key1)
# Return row1 but not row2
rr_result = _MockPartialRowsData(rows={row_key1: row1})
table._low_level_table.read_rows_result = rr_result
self.assertEqual(rr_result.consume_all_calls, 0)

fake_rows_filter = object()
mock_row_keys_filter_helper = _MockCalled(fake_rows_filter)
fake_filter = object()
mock_filter_chain_helper = _MockCalled(fake_filter)
fake_pair = object()
mock_cells_to_pairs = _MockCalled([fake_pair])

col_fam = u'cf1'
qual = b'qual'
fake_cells = object()
row1._cells = {col_fam: {qual: fake_cells}}
include_timestamp = object()
with _Monkey(MUT, _row_keys_filter_helper=mock_row_keys_filter_helper,
_filter_chain_helper=mock_filter_chain_helper,
_cells_to_pairs=mock_cells_to_pairs):
result = table.rows(rows, include_timestamp=include_timestamp)

# read_rows_result == PartialRowsData with row_key1
expected_result = {col_fam.encode('ascii') + b':' + qual: fake_pair}
self.assertEqual(result, [(row_key1, expected_result)])

read_rows_args = ()
read_rows_kwargs = {'filter_': fake_filter}
self.assertEqual(table._low_level_table.read_rows_calls, [
(read_rows_args, read_rows_kwargs),
])
self.assertEqual(rr_result.consume_all_calls, 1)

mock_row_keys_filter_helper.check_called(self, [(rows,)])
expected_kwargs = {
'filters': [fake_rows_filter],
'versions': 1,
'timestamp': None,
}
mock_filter_chain_helper.check_called(self, [()], [expected_kwargs])
to_pairs_kwargs = {'include_timestamp': include_timestamp}
mock_cells_to_pairs.check_called(
self, [(fake_cells,)], [to_pairs_kwargs])

def test_cells_empty_row(self):
from gcloud_bigtable._testing import _MockCalled
Expand Down Expand Up @@ -934,6 +1071,8 @@ def __init__(self, *args, **kwargs):
self.row_values = {}
self.read_row_calls = []
self.read_row_result = None
self.read_rows_calls = []
self.read_rows_result = None

def list_column_families(self):
self.list_column_families_calls += 1
Expand All @@ -946,6 +1085,10 @@ def read_row(self, *args, **kwargs):
self.read_row_calls.append((args, kwargs))
return self.read_row_result

def read_rows(self, *args, **kwargs):
self.read_rows_calls.append((args, kwargs))
return self.read_rows_result


class _MockLowLevelColumnFamily(object):

Expand Down Expand Up @@ -998,3 +1141,13 @@ def increment_cell_value(self, column_family_id, column, int_value):

def commit_modifications(self):
return self.commit_result


class _MockPartialRowsData(object):

def __init__(self, rows=None):
self.rows = rows or {}
self.consume_all_calls = 0

def consume_all(self):
self.consume_all_calls += 1
2 changes: 1 addition & 1 deletion gcloud_bigtable/row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ def rows(self):
:rtype: dict
:returns: Dictionary of :class:`PartialRowData`.
"""
# NOTE: To avoid duplication large objects, this is just the
# NOTE: To avoid duplicating large objects, this is just the
# mutable private data.
return self._rows

Expand Down

0 comments on commit 6ef5ecb

Please sign in to comment.