Skip to content

Commit

Permalink
Merge 8b257c0 into 1a91395
Browse files Browse the repository at this point in the history
  • Loading branch information
vikas-jamdar committed Jul 14, 2018
2 parents 1a91395 + 8b257c0 commit 55564a3
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 36 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pip-log.txt
.coverage
.tox
.cache
.pytest_cache

# Translations
*.mo
Expand Down
35 changes: 12 additions & 23 deletions src/google/cloud/happybase/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,21 +235,17 @@ def rows(self, rows, columns=None, timestamp=None,
# versions == 1 since we only want the latest.
filter_ = _filter_chain_helper(versions=1, timestamp=timestamp,
filters=filters)

partial_rows_data = self._low_level_table.read_rows(filter_=filter_)
rows_generator = self._low_level_table.yield_rows(
filter_=filter_)
# NOTE: We could use max_loops = 1000 or some similar value to ensure
# that the stream isn't open too long.
partial_rows_data.consume_all()

result = []
for row_key in rows:
if row_key not in partial_rows_data.rows:
continue
curr_row_data = partial_rows_data.rows[row_key]
for rowdata in rows_generator:
curr_row_data = rowdata
curr_row_dict = _partial_row_to_dict(
curr_row_data, include_timestamp=include_timestamp)
result.append((row_key, curr_row_dict))

result.append((curr_row_data.row_key, curr_row_dict))
return result

def cells(self, row, column, versions=None, timestamp=None,
Expand Down Expand Up @@ -385,23 +381,16 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None,
row_start, row_stop, filter_chain = _scan_filter_helper(
row_start, row_stop, row_prefix, columns, timestamp, limit, kwargs)

partial_rows_data = self._low_level_table.read_rows(
rows_generator = self._low_level_table.yield_rows(
start_key=row_start, end_key=row_stop,
limit=limit, filter_=filter_chain)

# Mutable copy of data.
rows_dict = partial_rows_data.rows
while True:
try:
partial_rows_data.consume_next()
for row_key in sorted(rows_dict):
curr_row_data = rows_dict.pop(row_key)
# NOTE: We expect len(rows_dict) == 0, but don't check it.
curr_row_dict = _partial_row_to_dict(
curr_row_data, include_timestamp=include_timestamp)
yield (row_key, curr_row_dict)
except StopIteration:
break
for rowdata in rows_generator:
curr_row_data = rowdata
# NOTE: We expect len(rows_dict) == 0, but don't check it.
curr_row_dict = _partial_row_to_dict(
curr_row_data, include_timestamp=include_timestamp)
yield (curr_row_data.row_key, curr_row_dict)

def put(self, row, data, timestamp=None, wal=_WAL_SENTINEL):
"""Insert data into a row in this table.
Expand Down
19 changes: 6 additions & 13 deletions unit_tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ def test_rows_with_columns(self):
table._low_level_table = _MockLowLevelTable()
rr_result = _MockPartialRowsData()
table._low_level_table.read_rows_result = rr_result
self.assertEqual(rr_result.consume_all_calls, 0)

# Set-up mocks.
fake_col_filter = object()
Expand Down Expand Up @@ -322,7 +321,6 @@ def mock_filter_chain_helper(**kwargs):
self.assertEqual(table._low_level_table.read_rows_calls, [
(read_rows_args, read_rows_kwargs),
])
self.assertEqual(rr_result.consume_all_calls, 1)

self.assertEqual(mock_cols, [(columns,)])
self.assertEqual(mock_rows, [(rows,)])
Expand Down Expand Up @@ -350,7 +348,6 @@ def test_rows_with_results(self):
# Return row1 but not row2
rr_result = _MockPartialRowsData(rows={row_key1: row1})
table._low_level_table.read_rows_result = rr_result
self.assertEqual(rr_result.consume_all_calls, 0)

# Set-up mocks.
fake_rows_filter = object()
Expand Down Expand Up @@ -393,7 +390,6 @@ def mock_cells_to_pairs(*args, **kwargs):
self.assertEqual(table._low_level_table.read_rows_calls, [
(read_rows_args, read_rows_kwargs),
])
self.assertEqual(rr_result.consume_all_calls, 1)

self.assertEqual(mock_rows, [(rows,)])
expected_kwargs = {
Expand Down Expand Up @@ -649,8 +645,6 @@ def mock_filter_chain_helper(**kwargs):
self.assertEqual(table._low_level_table.read_rows_calls, [
(read_rows_args, read_rows_kwargs),
])
self.assertEqual(rr_result.consume_next_calls,
rr_result.iterations + 1)

if columns is not None:
self.assertEqual(mock_columns, [(columns,)])
Expand Down Expand Up @@ -1472,9 +1466,13 @@ def read_row(self, *args, **kwargs):
self.read_row_calls.append((args, kwargs))
return self.read_row_result

def read_rows(self, *args, **kwargs):
def yield_rows(self, *args, **kwargs):
self.read_rows_calls.append((args, kwargs))
return self.read_rows_result
self.read_rows_result.consume_all()
rows_dict = self.read_rows_result.rows
for row_key in sorted(rows_dict):
curr_row_data = rows_dict.pop(row_key)
yield curr_row_data


class _MockLowLevelRow(object):
Expand Down Expand Up @@ -1528,8 +1526,3 @@ def __init__(self, rows=None, iterations=0):

def consume_all(self):
self.consume_all_calls += 1

def consume_next(self):
self.consume_next_calls += 1
if self.consume_next_calls > self.iterations:
raise StopIteration

0 comments on commit 55564a3

Please sign in to comment.