Skip to content

Commit

Permalink
Implemented HappyBase Table.cells().
Browse files Browse the repository at this point in the history
  • Loading branch information
dhermes committed Sep 11, 2015
1 parent 1ec4eed commit 4341833
Show file tree
Hide file tree
Showing 2 changed files with 265 additions and 13 deletions.
80 changes: 77 additions & 3 deletions gcloud_bigtable/happybase/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
import struct

from gcloud_bigtable._helpers import _microseconds_to_timestamp
from gcloud_bigtable._helpers import _timestamp_to_microseconds
from gcloud_bigtable.column_family import GarbageCollectionRule
from gcloud_bigtable.column_family import GarbageCollectionRuleIntersection
from gcloud_bigtable.happybase.batch import Batch
from gcloud_bigtable.happybase.batch import _WAL_SENTINEL
from gcloud_bigtable.row import RowFilter
from gcloud_bigtable.row import RowFilterChain
from gcloud_bigtable.row import TimestampRange
from gcloud_bigtable.table import Table as _LowLevelTable

Expand Down Expand Up @@ -150,6 +153,63 @@ def _convert_to_time_range(timestamp=None):
return TimestampRange(end=next_timestamp)


def _cells_to_pairs(cells, include_timestamp=False):
"""Converts list of cells to HappyBase format.
:type cells: list
:param cells: List of :class:`.Cell` returned from a read request.
:type include_timestamp: bool
:param include_timestamp: Flag to indicate if cell timestamps should be
included with the output.
:rtype: list
:returns: List of values in the cell. If ``include_timestamp=True``, each
value will be a pair, with the first part the bytes value in
the cell and the second part the number of milliseconds in the
timestamp on the cell.
"""
result = []
for cell in cells:
if include_timestamp:
ts_millis = _timestamp_to_microseconds(cell.timestamp) // 1000
result.append((cell.value, ts_millis))
else:
result.append(cell.value)
return result


def _filter_for_cell(column, versions=None, timestamp=None):
"""Create filter chain to isolate a single column in a row.
:type column: str
:param column: The column (``fam:col``) to be selected
with the filter.
:type versions: int
:param versions: (Optional) The maximum number of cells to return.
:type timestamp: int
:param timestamp: (Optional) Timestamp (in milliseconds since the
epoch). If specified, only cells returned before (or
at) the timestamp will be matched.
:rtype: :class:`.RowFilterChain`
:returns: The chained filter which identifies a single column.
"""
column_family_id, column_qualifier = column.split(':')
fam_filter = RowFilter(family_name_regex_filter=column_family_id)
qual_filter = RowFilter(column_qualifier_regex_filter=column_qualifier)
filters = [fam_filter, qual_filter]
if versions is not None:
filters.append(RowFilter(cells_per_column_limit_filter=versions))
time_range = _convert_to_time_range(timestamp=timestamp)
if time_range is not None:
filters.append(RowFilter(timestamp_range_filter=time_range))

return RowFilterChain(filters=filters)


class Table(object):
"""Representation of Cloud Bigtable table.
Expand Down Expand Up @@ -284,10 +344,24 @@ def cells(self, row, column, versions=None, timestamp=None,
:param include_timestamp: Flag to indicate if cell timestamps should be
included with the output.
:raises: :class:`NotImplementedError <exceptions.NotImplementedError>`
temporarily until the method is implemented.
:rtype: list
:returns: List of values in the cell (with timestamps if
``include_timestamp`` is :data:`True`).
"""
raise NotImplementedError('Temporarily not implemented.')
filter_ = _filter_for_cell(column, versions=versions,
timestamp=timestamp)
partial_row_data = self._low_level_table.read_row(row, filter_=filter_)
if partial_row_data is None:
return []
else:
cells = partial_row_data._cells
# NOTE: We expect the only key in `cells` is `column_family_id`
# and the only key `cells[column_family_id]` is
# `column_qualifier`. But we don't check that this is true.
column_family_id, column_qualifier = column.split(':')
curr_cells = cells[column_family_id][column_qualifier]
return _cells_to_pairs(
curr_cells, include_timestamp=include_timestamp)

def scan(self, row_start=None, row_stop=None, row_prefix=None,
columns=None, filter=None, timestamp=None,
Expand Down
198 changes: 188 additions & 10 deletions gcloud_bigtable/happybase/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ def test_it(self):

class Test__gc_rule_to_dict(unittest2.TestCase):

def _callFUT(self, gc_rule):
def _callFUT(self, *args, **kwargs):
from gcloud_bigtable.happybase.table import _gc_rule_to_dict
return _gc_rule_to_dict(gc_rule)
return _gc_rule_to_dict(*args, **kwargs)

def test_with_null(self):
gc_rule = None
Expand Down Expand Up @@ -158,6 +158,111 @@ def test_success(self):
self.assertEqual(result.end, next_ts)


class Test__cells_to_pairs(unittest2.TestCase):

def _callFUT(self, *args, **kwargs):
from gcloud_bigtable.happybase.table import _cells_to_pairs
return _cells_to_pairs(*args, **kwargs)

def test_without_timestamp(self):
from gcloud_bigtable.row_data import Cell

value1 = 'foo'
cell1 = Cell(value=value1, timestamp=None)
value2 = 'bar'
cell2 = Cell(value=value2, timestamp=None)

result = self._callFUT([cell1, cell2])
self.assertEqual(result, [value1, value2])

def test_with_timestamp(self):
from gcloud_bigtable._helpers import _microseconds_to_timestamp
from gcloud_bigtable.row_data import Cell

value1 = 'foo'
ts1_millis = 1221934570148
ts1 = _microseconds_to_timestamp(ts1_millis * 1000)
cell1 = Cell(value=value1, timestamp=ts1)

value2 = 'bar'
ts2_millis = 1221955575548
ts2 = _microseconds_to_timestamp(ts2_millis * 1000)
cell2 = Cell(value=value2, timestamp=ts2)

result = self._callFUT([cell1, cell2], include_timestamp=True)
self.assertEqual(result,
[(value1, ts1_millis), (value2, ts2_millis)])


class Test__filter_for_cell(unittest2.TestCase):

def _callFUT(self, *args, **kwargs):
from gcloud_bigtable.happybase.table import _filter_for_cell
return _filter_for_cell(*args, **kwargs)

def _helper(self, num_filters, versions=None, timestamp=None):
from gcloud_bigtable.row import RowFilter
from gcloud_bigtable.row import RowFilterChain

col_fam = 'cf1'
qual = 'qual'
column = col_fam + ':' + qual
result = self._callFUT(column, versions=versions, timestamp=timestamp)
self.assertTrue(isinstance(result, RowFilterChain))

self.assertEqual(len(result.filters), num_filters)
fam_filter = result.filters[0]
qual_filter = result.filters[1]
self.assertTrue(isinstance(fam_filter, RowFilter))
self.assertTrue(isinstance(qual_filter, RowFilter))

# Relies on the fact that RowFilter instances can
# only have one value set.
self.assertEqual(fam_filter.family_name_regex_filter, col_fam)
self.assertEqual(qual_filter.column_qualifier_regex_filter, qual)

return result

def test_column_only(self):
self._helper(num_filters=2)

def test_with_versions(self):
from gcloud_bigtable.row import RowFilter

versions = 11
result = self._helper(num_filters=3, versions=versions)

version_filter = result.filters[2]
self.assertTrue(isinstance(version_filter, RowFilter))
# Relies on the fact that RowFilter instances can
# only have one value set.
self.assertEqual(version_filter.cells_per_column_limit_filter,
versions)

def test_with_timestamp(self):
from gcloud_bigtable._helpers import _microseconds_to_timestamp
from gcloud_bigtable.row import RowFilter
from gcloud_bigtable.row import TimestampRange

timestamp = 1441928298571
result = self._helper(num_filters=3, timestamp=timestamp)

range_filter = result.filters[2]
self.assertTrue(isinstance(range_filter, RowFilter))
# Relies on the fact that RowFilter instances can
# only have one value set.
time_range = range_filter.timestamp_range_filter
self.assertTrue(isinstance(time_range, TimestampRange))
self.assertEqual(time_range.start, None)
next_ts = _microseconds_to_timestamp(1000 * (timestamp + 1))
self.assertEqual(time_range.end, next_ts)

def test_with_all_options(self):
versions = 11
timestamp = 1441928298571
self._helper(num_filters=4, versions=versions, timestamp=timestamp)


class TestTable(unittest2.TestCase):

def _getTargetClass(self):
Expand Down Expand Up @@ -267,20 +372,87 @@ def test_rows(self):
table.rows(row_keys, columns=columns, timestamp=timestamp,
include_timestamp=include_timestamp)

def test_cells(self):
def test_cells_empty_row(self):
from gcloud_bigtable._testing import _MockCalled
from gcloud_bigtable._testing import _Monkey
from gcloud_bigtable.happybase import table as MUT

name = 'table-name'
connection = None
table = self._makeOne(name, connection)
table._low_level_table = _MockLowLevelTable()
table._low_level_table.read_row_result = None

fake_filter = object()
mock_filter_for_cell = _MockCalled(fake_filter)

row_key = 'row-key'
column = 'fam:col1'
versions = 11
timestamp = None
include_timestamp = True
with self.assertRaises(NotImplementedError):
table.cells(row_key, column, versions=versions,
timestamp=timestamp,
include_timestamp=include_timestamp)
with _Monkey(MUT, _filter_for_cell=mock_filter_for_cell):
result = table.cells(row_key, column)

# read_row_result == None --> No results.
self.assertEqual(result, [])

read_row_args = (row_key,)
read_row_kwargs = {'filter_': fake_filter}
self.assertEqual(table._low_level_table.read_row_calls, [
(read_row_args, read_row_kwargs),
])

expected_kwargs = {'versions': None, 'timestamp': None}
mock_filter_for_cell.check_called(
self, [(column,)], [expected_kwargs])

def test_cells_with_results(self):
from gcloud_bigtable._testing import _MockCalled
from gcloud_bigtable._testing import _Monkey
from gcloud_bigtable.happybase import table as MUT
from gcloud_bigtable.row_data import PartialRowData

row_key = 'row-key'
name = 'table-name'
connection = None
table = self._makeOne(name, connection)
table._low_level_table = _MockLowLevelTable()
partial_row = PartialRowData(row_key)
table._low_level_table.read_row_result = partial_row

# These are all passed to mocks.
versions = object()
timestamp = object()
include_timestamp = object()

fake_filter = object()
mock_filter_for_cell = _MockCalled(fake_filter)
fake_result = object()
mock_cells_to_pairs = _MockCalled(fake_result)

col_fam = 'cf1'
qual = 'qual'
fake_cells = object()
partial_row._cells = {col_fam: {qual: fake_cells}}
column = col_fam + ':' + qual
with _Monkey(MUT, _filter_for_cell=mock_filter_for_cell,
_cells_to_pairs=mock_cells_to_pairs):
result = table.cells(row_key, column, versions=versions,
timestamp=timestamp,
include_timestamp=include_timestamp)

self.assertEqual(result, fake_result)

read_row_args = (row_key,)
read_row_kwargs = {'filter_': fake_filter}
self.assertEqual(table._low_level_table.read_row_calls, [
(read_row_args, read_row_kwargs),
])

filter_kwargs = {'versions': versions, 'timestamp': timestamp}
mock_filter_for_cell.check_called(
self, [(column,)], [filter_kwargs])
to_pairs_kwargs = {'include_timestamp': include_timestamp}
mock_cells_to_pairs.check_called(
self, [(fake_cells,)], [to_pairs_kwargs])

def test_scan(self):
name = 'table-name'
Expand Down Expand Up @@ -569,6 +741,8 @@ def __init__(self, *args, **kwargs):
self.list_column_families_calls = 0
self.column_families = {}
self.row_values = {}
self.read_row_calls = []
self.read_row_result = None

def list_column_families(self):
self.list_column_families_calls += 1
Expand All @@ -577,6 +751,10 @@ def list_column_families(self):
def row(self, row_key):
return self.row_values[row_key]

def read_row(self, *args, **kwargs):
self.read_row_calls.append((args, kwargs))
return self.read_row_result


class _MockLowLevelColumnFamily(object):

Expand Down

0 comments on commit 4341833

Please sign in to comment.