Skip to content

Commit

Permalink
fix: Prevent sending full table scan when retrying (#554)
Browse files Browse the repository at this point in the history
* fix: Prevent sending full table scan when retrying

Update the retry logic. Don't send empty row_key and empty row_ranges
if the original message didn't ask for those.

Closes internal issue 214449800

* Create InvalidRetryRequest exception.
Raise InvalidRetryRequest instead of StopIteration
Catch the InvalidRetryRequest
Handle stop the retry request if row_limit has been reached.

* Improve test coverage

* Improve test coverage
  • Loading branch information
Mariatta committed Apr 7, 2022
1 parent ec7cc42 commit 56f5357
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 12 deletions.
26 changes: 20 additions & 6 deletions google/cloud/bigtable/row_data.py
Expand Up @@ -321,11 +321,15 @@ def cell_values(self, column_family_id, column, max_count=None):


class InvalidReadRowsResponse(RuntimeError):
"""Exception raised to to invalid response data from back-end."""
"""Exception raised to invalid response data from back-end."""


class InvalidChunk(RuntimeError):
"""Exception raised to to invalid chunk data from back-end."""
"""Exception raised to invalid chunk data from back-end."""


class InvalidRetryRequest(RuntimeError):
"""Exception raised when retry request is invalid."""


def _retry_read_rows_exception(exc):
Expand Down Expand Up @@ -486,6 +490,9 @@ def __iter__(self):
if self.state != self.NEW_ROW:
raise ValueError("The row remains partial / is not committed.")
break
except InvalidRetryRequest:
self._cancelled = True
break

for chunk in response.chunks:
if self._cancelled:
Expand Down Expand Up @@ -629,10 +636,11 @@ def build_updated_request(self):
data_messages_v2_pb2.ReadRowsRequest.copy_from(resume_request, self.message)

if self.message.rows_limit != 0:
# TODO: Throw an error if rows_limit - read_so_far is 0 or negative.
resume_request.rows_limit = max(
1, self.message.rows_limit - self.rows_read_so_far
)
row_limit_remaining = self.message.rows_limit - self.rows_read_so_far
if row_limit_remaining > 0:
resume_request.rows_limit = row_limit_remaining
else:
raise InvalidRetryRequest

# if neither RowSet.row_keys nor RowSet.row_ranges currently exist,
# add row_range that starts with last_scanned_key as start_key_open
Expand All @@ -643,6 +651,12 @@ def build_updated_request(self):
else:
row_keys = self._filter_rows_keys()
row_ranges = self._filter_row_ranges()

if len(row_keys) == 0 and len(row_ranges) == 0:
# Avoid sending empty row_keys and row_ranges
# if that was not the intention
raise InvalidRetryRequest

resume_request.rows = data_v2_pb2.RowSet(
row_keys=row_keys, row_ranges=row_ranges
)
Expand Down
82 changes: 82 additions & 0 deletions tests/unit/test_row_data.py
Expand Up @@ -810,6 +810,18 @@ def test_RRRM__filter_row_key():
assert expected_row_keys == row_keys


def test_RRRM__filter_row_key_is_empty():
table_name = "table_name"
request = _ReadRowsRequestPB(table_name=table_name)
request.rows.row_keys.extend([b"row_key1", b"row_key2", b"row_key3", b"row_key4"])

last_scanned_key = b"row_key4"
request_manager = _make_read_rows_request_manager(request, last_scanned_key, 4)
row_keys = request_manager._filter_rows_keys()

assert row_keys == []


def test_RRRM__filter_row_ranges_all_ranges_added_back(rrrm_data):
from google.cloud.bigtable_v2.types import data as data_v2_pb2

Expand Down Expand Up @@ -1036,6 +1048,76 @@ def test_RRRM__key_already_read():
assert not request_manager._key_already_read(b"row_key16")


def test_RRRM__rows_limit_reached():
from google.cloud.bigtable.row_data import InvalidRetryRequest

last_scanned_key = b"row_key14"
request = _ReadRowsRequestPB(table_name=TABLE_NAME)
request.rows_limit = 2
request_manager = _make_read_rows_request_manager(
request, last_scanned_key=last_scanned_key, rows_read_so_far=2
)
with pytest.raises(InvalidRetryRequest):
request_manager.build_updated_request()


def test_RRRM_build_updated_request_last_row_read_raises_invalid_retry_request():
from google.cloud.bigtable.row_data import InvalidRetryRequest

last_scanned_key = b"row_key4"
request = _ReadRowsRequestPB(table_name=TABLE_NAME)
request.rows.row_keys.extend([b"row_key1", b"row_key2", b"row_key4"])

request_manager = _make_read_rows_request_manager(
request, last_scanned_key, rows_read_so_far=3
)
with pytest.raises(InvalidRetryRequest):
request_manager.build_updated_request()


def test_RRRM_build_updated_request_row_ranges_read_raises_invalid_retry_request():
from google.cloud.bigtable.row_data import InvalidRetryRequest
from google.cloud.bigtable import row_set

row_range1 = row_set.RowRange(b"row_key21", b"row_key29")

request = _ReadRowsRequestPB(table_name=TABLE_NAME)
request.rows.row_ranges.append(row_range1.get_range_kwargs())

last_scanned_key = b"row_key4"
request = _ReadRowsRequestPB(
table_name=TABLE_NAME,
)
request.rows.row_ranges.append(row_range1.get_range_kwargs())

request_manager = _make_read_rows_request_manager(
request, last_scanned_key, rows_read_so_far=2
)
with pytest.raises(InvalidRetryRequest):
request_manager.build_updated_request()


def test_RRRM_build_updated_request_row_ranges_valid():
from google.cloud.bigtable import row_set

row_range1 = row_set.RowRange(b"row_key21", b"row_key29")

request = _ReadRowsRequestPB(table_name=TABLE_NAME)
request.rows.row_ranges.append(row_range1.get_range_kwargs())

last_scanned_key = b"row_key21"
request = _ReadRowsRequestPB(
table_name=TABLE_NAME,
)
request.rows.row_ranges.append(row_range1.get_range_kwargs())

request_manager = _make_read_rows_request_manager(
request, last_scanned_key, rows_read_so_far=1
)
updated_request = request_manager.build_updated_request()
assert len(updated_request.rows.row_ranges) > 0


@pytest.fixture(scope="session")
def json_tests():
dirname = os.path.dirname(__file__)
Expand Down
54 changes: 48 additions & 6 deletions tests/unit/test_table.py
Expand Up @@ -910,7 +910,6 @@ def mock_create_row_request(table_name, **kwargs):

def test_table_read_retry_rows():
from google.api_core import retry
from google.cloud.bigtable.table import _create_row_request

credentials = _make_credentials()
client = _make_client(project="project-id", credentials=credentials, admin=True)
Expand Down Expand Up @@ -965,12 +964,55 @@ def test_table_read_retry_rows():
result = rows[1]
assert result.row_key == ROW_KEY_2

expected_request = _create_row_request(
table.name,
start_key=ROW_KEY_1,
end_key=ROW_KEY_2,
assert len(data_api.read_rows.mock_calls) == 3


def test_table_read_retry_rows_no_full_table_scan():
from google.api_core import retry

credentials = _make_credentials()
client = _make_client(project="project-id", credentials=credentials, admin=True)
data_api = client._table_data_client = _make_data_api()
instance = client.instance(instance_id=INSTANCE_ID)
table = _make_table(TABLE_ID, instance)

retry_read_rows = retry.Retry(predicate=_read_rows_retry_exception)

# Create response_iterator
chunk_1 = _ReadRowsResponseCellChunkPB(
row_key=ROW_KEY_2,
family_name=FAMILY_NAME,
qualifier=QUALIFIER,
timestamp_micros=TIMESTAMP_MICROS,
value=VALUE,
commit_row=True,
)
data_api.read_rows.mock_calls = [expected_request] * 3

response_1 = _ReadRowsResponseV2([chunk_1])
response_failure_iterator_2 = _MockFailureIterator_2([response_1])

data_api.table_path.return_value = (
f"projects/{PROJECT_ID}/instances/{INSTANCE_ID}/tables/{TABLE_ID}"
)

data_api.read_rows.side_effect = [
response_failure_iterator_2,
]

rows = [
row
for row in table.read_rows(
start_key="doesn't matter", end_key=ROW_KEY_2, retry=retry_read_rows
)
]
assert len(rows) == 1
result = rows[0]
assert result.row_key == ROW_KEY_2

assert len(data_api.read_rows.mock_calls) == 1
assert (
len(data_api.read_rows.mock_calls[0].args[0].rows.row_ranges) > 0
) # not empty row_ranges


def test_table_yield_retry_rows():
Expand Down

0 comments on commit 56f5357

Please sign in to comment.