Skip to content

Commit

Permalink
Merge de887c1 into 1a91395
Browse files Browse the repository at this point in the history
  • Loading branch information
vikas-jamdar committed Jul 12, 2018
2 parents 1a91395 + de887c1 commit d94086c
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 25 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pip-log.txt
.coverage
.tox
.cache
.pytest_cache

# Translations
*.mo
Expand Down
35 changes: 12 additions & 23 deletions src/google/cloud/happybase/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,21 +235,17 @@ def rows(self, rows, columns=None, timestamp=None,
# versions == 1 since we only want the latest.
filter_ = _filter_chain_helper(versions=1, timestamp=timestamp,
filters=filters)

partial_rows_data = self._low_level_table.read_rows(filter_=filter_)
rows_generator = self._low_level_table.yield_rows(
filter_=filter_)
# NOTE: We could use max_loops = 1000 or some similar value to ensure
# that the stream isn't open too long.
partial_rows_data.consume_all()

result = []
for row_key in rows:
if row_key not in partial_rows_data.rows:
continue
curr_row_data = partial_rows_data.rows[row_key]
for rowdata in rows_generator:
curr_row_data = rowdata
curr_row_dict = _partial_row_to_dict(
curr_row_data, include_timestamp=include_timestamp)
result.append((row_key, curr_row_dict))

result.append((curr_row_data.row_key, curr_row_dict))
return result

def cells(self, row, column, versions=None, timestamp=None,
Expand Down Expand Up @@ -385,23 +381,16 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None,
row_start, row_stop, filter_chain = _scan_filter_helper(
row_start, row_stop, row_prefix, columns, timestamp, limit, kwargs)

partial_rows_data = self._low_level_table.read_rows(
rows_generator = self._low_level_table.yield_rows(
start_key=row_start, end_key=row_stop,
limit=limit, filter_=filter_chain)

# Mutable copy of data.
rows_dict = partial_rows_data.rows
while True:
try:
partial_rows_data.consume_next()
for row_key in sorted(rows_dict):
curr_row_data = rows_dict.pop(row_key)
# NOTE: We expect len(rows_dict) == 0, but don't check it.
curr_row_dict = _partial_row_to_dict(
curr_row_data, include_timestamp=include_timestamp)
yield (row_key, curr_row_dict)
except StopIteration:
break
for rowdata in rows_generator:
curr_row_data = rowdata
# NOTE: We expect len(rows_dict) == 0, but don't check it.
curr_row_dict = _partial_row_to_dict(
curr_row_data, include_timestamp=include_timestamp)
yield (curr_row_data.row_key, curr_row_dict)

def put(self, row, data, timestamp=None, wal=_WAL_SENTINEL):
"""Insert data into a row in this table.
Expand Down
13 changes: 11 additions & 2 deletions unit_tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1472,9 +1472,18 @@ def read_row(self, *args, **kwargs):
self.read_row_calls.append((args, kwargs))
return self.read_row_result

def read_rows(self, *args, **kwargs):
def yield_rows(self, *args, **kwargs):
self.read_rows_calls.append((args, kwargs))
return self.read_rows_result
self.read_rows_result.consume_all()
rows_dict = self.read_rows_result.rows
while True:
try:
for row_key in sorted(rows_dict):
curr_row_data = rows_dict.pop(row_key)
yield curr_row_data
self.read_rows_result.consume_next()
except StopIteration:
break


class _MockLowLevelRow(object):
Expand Down

0 comments on commit d94086c

Please sign in to comment.