Skip to content

Commit

Permalink
Upstream changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
dhermes committed Feb 24, 2016
1 parent 8d91feb commit c1cd1be
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 23 deletions.
19 changes: 13 additions & 6 deletions gcloud_bigtable/happybase/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,23 @@


import datetime
import warnings

import six

from gcloud_bigtable._non_upstream_helpers import _microseconds_to_timestamp
from gcloud_bigtable.row import TimestampRange


# BEGIN: Renames for upstream.
_datetime_from_microseconds = _microseconds_to_timestamp
# END: Renames for upstream.
_WAL_SENTINEL = object()
# Assumed granularity of timestamps in Cloud Bigtable.
_ONE_MILLISECOND = datetime.timedelta(microseconds=1000)
_WARN = warnings.warn
_WAL_WARNING = ('The wal argument (Write-Ahead-Log) is not '
'supported by Cloud Bigtable.')


def _get_column_pairs(columns, require_qualifier=False):
Expand Down Expand Up @@ -106,15 +114,12 @@ class Batch(object):
is set and ``transaction=True``.
:class:`ValueError <exceptions.ValueError>` if ``batch_size``
is not positive.
:class:`ValueError <exceptions.ValueError>` if ``wal``
is used.
"""

def __init__(self, table, timestamp=None, batch_size=None,
transaction=False, wal=_WAL_SENTINEL):
if wal is not _WAL_SENTINEL:
raise ValueError('The wal argument cannot be used with '
'Cloud Bigtable.')
_WARN(_WAL_WARNING)

if batch_size is not None:
if transaction:
Expand All @@ -125,17 +130,19 @@ def __init__(self, table, timestamp=None, batch_size=None,

self._table = table
self._batch_size = batch_size
# Timestamp is in milliseconds, convert to microseconds.
self._timestamp = self._delete_range = None

# Timestamp is in milliseconds, convert to microseconds.
if timestamp is not None:
self._timestamp = _microseconds_to_timestamp(1000 * timestamp)
self._timestamp = _datetime_from_microseconds(1000 * timestamp)
# For deletes, we get the very next timestamp (assuming timestamp
# granularity is milliseconds). This is because HappyBase users
# expect HBase deletes to go **up to** and **including** the
# timestamp while Cloud Bigtable Time Ranges **exclude** the
# final timestamp.
next_timestamp = self._timestamp + _ONE_MILLISECOND
self._delete_range = TimestampRange(end=next_timestamp)

self._transaction = transaction

# Internal state for tracking mutations.
Expand Down
12 changes: 8 additions & 4 deletions gcloud_bigtable/happybase/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ class Table(object):
def __init__(self, name, connection):
self.name = name
# This remains as legacy for HappyBase, but only the cluster
# from it is needed.
# from the connection is needed.
self.connection = connection
self._low_level_table = None
if self.connection is not None:
Expand All @@ -367,8 +367,10 @@ def families(self):
for a column family.
"""
column_family_map = self._low_level_table.list_column_families()
return {col_fam: _gc_rule_to_dict(col_fam_obj.gc_rule)
for col_fam, col_fam_obj in column_family_map.items()}
result = {}
for col_fam, col_fam_obj in six.iteritems(column_family_map):
result[col_fam] = _gc_rule_to_dict(col_fam_obj.gc_rule)
return result

def regions(self):
"""Retrieve the regions for this table.
Expand Down Expand Up @@ -787,7 +789,7 @@ def counter_get(self, row, column):
:returns: Counter value (after initializing / incrementing by 0).
"""
# Don't query directly, but increment with value=0 so that the counter
# is correctly initialised if didn't exist yet.
# is correctly initialized if didn't exist yet.
return self.counter_inc(row, column, value=0)

def counter_set(self, row, column, value=0):
Expand Down Expand Up @@ -836,6 +838,8 @@ def counter_inc(self, row, column, value=1):
:returns: Counter value after incrementing.
"""
row = self._low_level_table.row(row)
if isinstance(column, six.binary_type):
column = column.decode('utf-8')
column_family_id, column_qualifier = column.split(':')
row.increment_cell_value(column_family_id, column_qualifier, value)
modified_cells = row.commit_modifications()
Expand Down
20 changes: 7 additions & 13 deletions gcloud_bigtable/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,7 @@ def read_row(self, row_key, filter_=None):
:type row_key: bytes
:param row_key: The key of the row to read from.
:type filter_: :class:`.row.RowFilter`, :class:`.row.RowFilterChain`,
:class:`.row.RowFilterUnion` or
:class:`.row.ConditionalRowFilter`
:type filter_: :class:`.row.RowFilter`
:param filter_: (Optional) The filter to apply to the contents of the
row. If unset, returns the entire row.
Expand Down Expand Up @@ -272,13 +270,6 @@ def read_rows(self, start_key=None, end_key=None,
The range will not include ``end_key``. If left empty,
will be interpreted as an infinite string.
:type filter_: :class:`.row.RowFilter`, :class:`.row.RowFilterChain`,
:class:`.row.RowFilterUnion` or
:class:`.row.ConditionalRowFilter`
:param filter_: (Optional) The filter to apply to the contents of the
specified row(s). If unset, reads every column in
each row.
:type allow_row_interleaving: bool
:param allow_row_interleaving: (Optional) By default, rows are read
sequentially, producing results which
Expand All @@ -300,6 +291,11 @@ def read_rows(self, start_key=None, end_key=None,
more than N rows. However, only N ``commit_row`` chunks
will be sent.
:type filter_: :class:`.row.RowFilter`
:param filter_: (Optional) The filter to apply to the contents of the
specified row(s). If unset, reads every column in
each row.
:rtype: :class:`.PartialRowsData`
:returns: A :class:`.PartialRowsData` convenience wrapper for consuming
the streamed results.
Expand Down Expand Up @@ -372,9 +368,7 @@ def _create_row_request(table_name, row_key=None, start_key=None, end_key=None,
The range will not include ``end_key``. If left empty,
will be interpreted as an infinite string.
:type filter_: :class:`.row.RowFilter`, :class:`.row.RowFilterChain`,
:class:`.row.RowFilterUnion` or
:class:`.row.ConditionalRowFilter`
:type filter_: :class:`.row.RowFilter`
:param filter_: (Optional) The filter to apply to the contents of the
specified row(s). If unset, reads the entire table.
Expand Down

0 comments on commit c1cd1be

Please sign in to comment.