Skip to content

Commit

Permalink
Implemented HappyBase Table.row().
Browse files Browse the repository at this point in the history
This required reworking existing helpers for creating
filters and making some new ones.

This also revealed that using a single filter in a Union
or Chain causes the server to reject the request, hence
we have an extra check if `filters` has only one
element.
  • Loading branch information
dhermes committed Sep 11, 2015
1 parent 4341833 commit 9544a80
Show file tree
Hide file tree
Showing 2 changed files with 311 additions and 41 deletions.
111 changes: 95 additions & 16 deletions gcloud_bigtable/happybase/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Google Cloud Bigtable HappyBase table module."""


import six
import struct

from gcloud_bigtable._helpers import _microseconds_to_timestamp
Expand All @@ -23,8 +24,10 @@
from gcloud_bigtable.column_family import GarbageCollectionRuleIntersection
from gcloud_bigtable.happybase.batch import Batch
from gcloud_bigtable.happybase.batch import _WAL_SENTINEL
from gcloud_bigtable.happybase.batch import _get_column_pairs
from gcloud_bigtable.row import RowFilter
from gcloud_bigtable.row import RowFilterChain
from gcloud_bigtable.row import RowFilterUnion
from gcloud_bigtable.row import TimestampRange
from gcloud_bigtable.table import Table as _LowLevelTable

Expand Down Expand Up @@ -179,11 +182,12 @@ def _cells_to_pairs(cells, include_timestamp=False):
return result


def _filter_for_cell(column, versions=None, timestamp=None):
"""Create filter chain to isolate a single column in a row.
def _filter_chain_helper(column=None, versions=None, timestamp=None,
filters=None):
"""Create filter chain to limit a results set.
:type column: str
:param column: The column (``fam:col``) to be selected
:param column: (Optional) The column (``fam:col``) to be selected
with the filter.
:type versions: int
Expand All @@ -194,20 +198,73 @@ def _filter_for_cell(column, versions=None, timestamp=None):
epoch). If specified, only cells returned before (or
at) the timestamp will be matched.
:rtype: :class:`.RowFilterChain`
:returns: The chained filter which identifies a single column.
:type filters: list
:param filters: (Optional) List of existing filters to be extended.
:rtype: :class:`.RowFilterChain`, :class:`.RowFilter`
:returns: The chained filter created, or just a single filter if only
one was needed.
:raises: :class:`ValueError <exceptions.ValueError>` if there are no
filters to chain.
"""
column_family_id, column_qualifier = column.split(':')
fam_filter = RowFilter(family_name_regex_filter=column_family_id)
qual_filter = RowFilter(column_qualifier_regex_filter=column_qualifier)
filters = [fam_filter, qual_filter]
if filters is None:
filters = []

if column is not None:
column_family_id, column_qualifier = column.split(':')
fam_filter = RowFilter(family_name_regex_filter=column_family_id)
qual_filter = RowFilter(column_qualifier_regex_filter=column_qualifier)
filters.extend([fam_filter, qual_filter])
if versions is not None:
filters.append(RowFilter(cells_per_column_limit_filter=versions))
time_range = _convert_to_time_range(timestamp=timestamp)
if time_range is not None:
filters.append(RowFilter(timestamp_range_filter=time_range))

return RowFilterChain(filters=filters)
num_filters = len(filters)
if num_filters == 0:
raise ValueError('Must have at least one filter.')
elif num_filters == 1:
return filters[0]
else:
return RowFilterChain(filters=filters)


def _columns_filter_helper(columns):
"""Creates a union filter for a list of columns.
:type columns:
:param columns: Iterable containing column names (as strings). Each column
name can be either
* an entire column family: ``fam`` or ``fam:``
* an single column: ``fam:col``
:rtype: :class:`.RowFilterUnion`, :class:`.RowFilter`
:returns: The union filter created containing all of the matched columns.
:raises: :class:`ValueError <exceptions.ValueError>` if there are no
filters to union.
"""
filters = []
for column_family_id, column_qualifier in _get_column_pairs(columns):
if column_qualifier is not None:
fam_filter = RowFilter(family_name_regex_filter=column_family_id)
qual_filter = RowFilter(
column_qualifier_regex_filter=column_qualifier)
combined_filter = RowFilterChain(
filters=[fam_filter, qual_filter])
filters.append(combined_filter)
else:
fam_filter = RowFilter(family_name_regex_filter=column_family_id)
filters.append(fam_filter)

num_filters = len(filters)
if num_filters == 0:
raise ValueError('Must have at least one filter.')
elif num_filters == 1:
return filters[0]
else:
return RowFilterUnion(filters=filters)


class Table(object):
Expand Down Expand Up @@ -285,10 +342,30 @@ def row(self, row, columns=None, timestamp=None, include_timestamp=False):
:param include_timestamp: Flag to indicate if cell timestamps should be
included with the output.
:raises: :class:`NotImplementedError <exceptions.NotImplementedError>`
temporarily until the method is implemented.
:rtype: dict
:returns: Dictionary containing all the latest column values in
the row.
"""
raise NotImplementedError('Temporarily not implemented.')
filters = []
if columns is not None:
filters.append(_columns_filter_helper(columns))
# versions == 1 since we only want the latest.
filter_ = _filter_chain_helper(versions=1, timestamp=timestamp,
filters=filters)

partial_row_data = self._low_level_table.read_row(
row, filter_=filter_)
if partial_row_data is None:
return {}

result = {}
for column, cells in six.iteritems(partial_row_data.to_dict()):
cell_vals = _cells_to_pairs(cells,
include_timestamp=include_timestamp)
# NOTE: We assume there is exactly 1 version since we used that in
# our filter, but we don't check this.
result[column] = cell_vals[0]
return result

def rows(self, rows, columns=None, timestamp=None,
include_timestamp=False):
Expand Down Expand Up @@ -348,17 +425,19 @@ def cells(self, row, column, versions=None, timestamp=None,
:returns: List of values in the cell (with timestamps if
``include_timestamp`` is :data:`True`).
"""
filter_ = _filter_for_cell(column, versions=versions,
timestamp=timestamp)
filter_ = _filter_chain_helper(column=column, versions=versions,
timestamp=timestamp)
partial_row_data = self._low_level_table.read_row(row, filter_=filter_)
if partial_row_data is None:
return []
else:
cells = partial_row_data._cells
# We know that `_filter_chain_helper` has already verified that
# column will split as such.
column_family_id, column_qualifier = column.split(':')
# NOTE: We expect the only key in `cells` is `column_family_id`
# and the only key `cells[column_family_id]` is
# `column_qualifier`. But we don't check that this is true.
column_family_id, column_qualifier = column.split(':')
curr_cells = cells[column_family_id][column_qualifier]
return _cells_to_pairs(
curr_cells, include_timestamp=include_timestamp)
Expand Down

0 comments on commit 9544a80

Please sign in to comment.