From 56f5357c09ac867491b934f6029776dcd74c6eac Mon Sep 17 00:00:00 2001 From: Mariatta Wijaya Date: Thu, 7 Apr 2022 13:39:58 -0700 Subject: [PATCH] fix: Prevent sending full table scan when retrying (#554) * fix: Prevent sending full table scan when retrying Update the retry logic. Don't send empty row_key and empty row_ranges if the original message didn't ask for those. Closes internal issue 214449800 * Create InvalidRetryRequest exception. Raise InvalidRetryRequest instead of StopIteration Catch the InvalidRetryRequest Handle stop the retry request if row_limit has been reached. * Improve test coverage * Improve test coverage --- google/cloud/bigtable/row_data.py | 26 +++++++--- tests/unit/test_row_data.py | 82 +++++++++++++++++++++++++++++++ tests/unit/test_table.py | 54 +++++++++++++++++--- 3 files changed, 150 insertions(+), 12 deletions(-) diff --git a/google/cloud/bigtable/row_data.py b/google/cloud/bigtable/row_data.py index 0517f82e1..0c1565737 100644 --- a/google/cloud/bigtable/row_data.py +++ b/google/cloud/bigtable/row_data.py @@ -321,11 +321,15 @@ def cell_values(self, column_family_id, column, max_count=None): class InvalidReadRowsResponse(RuntimeError): - """Exception raised to to invalid response data from back-end.""" + """Exception raised to invalid response data from back-end.""" class InvalidChunk(RuntimeError): - """Exception raised to to invalid chunk data from back-end.""" + """Exception raised to invalid chunk data from back-end.""" + + +class InvalidRetryRequest(RuntimeError): + """Exception raised when retry request is invalid.""" def _retry_read_rows_exception(exc): @@ -486,6 +490,9 @@ def __iter__(self): if self.state != self.NEW_ROW: raise ValueError("The row remains partial / is not committed.") break + except InvalidRetryRequest: + self._cancelled = True + break for chunk in response.chunks: if self._cancelled: @@ -629,10 +636,11 @@ def build_updated_request(self): data_messages_v2_pb2.ReadRowsRequest.copy_from(resume_request, self.message) if self.message.rows_limit != 0: - # TODO: Throw an error if rows_limit - read_so_far is 0 or negative. - resume_request.rows_limit = max( - 1, self.message.rows_limit - self.rows_read_so_far - ) + row_limit_remaining = self.message.rows_limit - self.rows_read_so_far + if row_limit_remaining > 0: + resume_request.rows_limit = row_limit_remaining + else: + raise InvalidRetryRequest # if neither RowSet.row_keys nor RowSet.row_ranges currently exist, # add row_range that starts with last_scanned_key as start_key_open @@ -643,6 +651,12 @@ def build_updated_request(self): else: row_keys = self._filter_rows_keys() row_ranges = self._filter_row_ranges() + + if len(row_keys) == 0 and len(row_ranges) == 0: + # Avoid sending empty row_keys and row_ranges + # if that was not the intention + raise InvalidRetryRequest + resume_request.rows = data_v2_pb2.RowSet( row_keys=row_keys, row_ranges=row_ranges ) diff --git a/tests/unit/test_row_data.py b/tests/unit/test_row_data.py index d647bbaba..9b329dc9f 100644 --- a/tests/unit/test_row_data.py +++ b/tests/unit/test_row_data.py @@ -810,6 +810,18 @@ def test_RRRM__filter_row_key(): assert expected_row_keys == row_keys +def test_RRRM__filter_row_key_is_empty(): + table_name = "table_name" + request = _ReadRowsRequestPB(table_name=table_name) + request.rows.row_keys.extend([b"row_key1", b"row_key2", b"row_key3", b"row_key4"]) + + last_scanned_key = b"row_key4" + request_manager = _make_read_rows_request_manager(request, last_scanned_key, 4) + row_keys = request_manager._filter_rows_keys() + + assert row_keys == [] + + def test_RRRM__filter_row_ranges_all_ranges_added_back(rrrm_data): from google.cloud.bigtable_v2.types import data as data_v2_pb2 @@ -1036,6 +1048,76 @@ def test_RRRM__key_already_read(): assert not request_manager._key_already_read(b"row_key16") +def test_RRRM__rows_limit_reached(): + from google.cloud.bigtable.row_data import InvalidRetryRequest + + last_scanned_key = b"row_key14" + request = _ReadRowsRequestPB(table_name=TABLE_NAME) + request.rows_limit = 2 + request_manager = _make_read_rows_request_manager( + request, last_scanned_key=last_scanned_key, rows_read_so_far=2 + ) + with pytest.raises(InvalidRetryRequest): + request_manager.build_updated_request() + + +def test_RRRM_build_updated_request_last_row_read_raises_invalid_retry_request(): + from google.cloud.bigtable.row_data import InvalidRetryRequest + + last_scanned_key = b"row_key4" + request = _ReadRowsRequestPB(table_name=TABLE_NAME) + request.rows.row_keys.extend([b"row_key1", b"row_key2", b"row_key4"]) + + request_manager = _make_read_rows_request_manager( + request, last_scanned_key, rows_read_so_far=3 + ) + with pytest.raises(InvalidRetryRequest): + request_manager.build_updated_request() + + +def test_RRRM_build_updated_request_row_ranges_read_raises_invalid_retry_request(): + from google.cloud.bigtable.row_data import InvalidRetryRequest + from google.cloud.bigtable import row_set + + row_range1 = row_set.RowRange(b"row_key21", b"row_key29") + + request = _ReadRowsRequestPB(table_name=TABLE_NAME) + request.rows.row_ranges.append(row_range1.get_range_kwargs()) + + last_scanned_key = b"row_key4" + request = _ReadRowsRequestPB( + table_name=TABLE_NAME, + ) + request.rows.row_ranges.append(row_range1.get_range_kwargs()) + + request_manager = _make_read_rows_request_manager( + request, last_scanned_key, rows_read_so_far=2 + ) + with pytest.raises(InvalidRetryRequest): + request_manager.build_updated_request() + + +def test_RRRM_build_updated_request_row_ranges_valid(): + from google.cloud.bigtable import row_set + + row_range1 = row_set.RowRange(b"row_key21", b"row_key29") + + request = _ReadRowsRequestPB(table_name=TABLE_NAME) + request.rows.row_ranges.append(row_range1.get_range_kwargs()) + + last_scanned_key = b"row_key21" + request = _ReadRowsRequestPB( + table_name=TABLE_NAME, + ) + request.rows.row_ranges.append(row_range1.get_range_kwargs()) + + request_manager = _make_read_rows_request_manager( + request, last_scanned_key, rows_read_so_far=1 + ) + updated_request = request_manager.build_updated_request() + assert len(updated_request.rows.row_ranges) > 0 + + @pytest.fixture(scope="session") def json_tests(): dirname = os.path.dirname(__file__) diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index fb4ec3539..883f713d8 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -910,7 +910,6 @@ def mock_create_row_request(table_name, **kwargs): def test_table_read_retry_rows(): from google.api_core import retry - from google.cloud.bigtable.table import _create_row_request credentials = _make_credentials() client = _make_client(project="project-id", credentials=credentials, admin=True) @@ -965,12 +964,55 @@ def test_table_read_retry_rows(): result = rows[1] assert result.row_key == ROW_KEY_2 - expected_request = _create_row_request( - table.name, - start_key=ROW_KEY_1, - end_key=ROW_KEY_2, + assert len(data_api.read_rows.mock_calls) == 3 + + +def test_table_read_retry_rows_no_full_table_scan(): + from google.api_core import retry + + credentials = _make_credentials() + client = _make_client(project="project-id", credentials=credentials, admin=True) + data_api = client._table_data_client = _make_data_api() + instance = client.instance(instance_id=INSTANCE_ID) + table = _make_table(TABLE_ID, instance) + + retry_read_rows = retry.Retry(predicate=_read_rows_retry_exception) + + # Create response_iterator + chunk_1 = _ReadRowsResponseCellChunkPB( + row_key=ROW_KEY_2, + family_name=FAMILY_NAME, + qualifier=QUALIFIER, + timestamp_micros=TIMESTAMP_MICROS, + value=VALUE, + commit_row=True, ) - data_api.read_rows.mock_calls = [expected_request] * 3 + + response_1 = _ReadRowsResponseV2([chunk_1]) + response_failure_iterator_2 = _MockFailureIterator_2([response_1]) + + data_api.table_path.return_value = ( + f"projects/{PROJECT_ID}/instances/{INSTANCE_ID}/tables/{TABLE_ID}" + ) + + data_api.read_rows.side_effect = [ + response_failure_iterator_2, + ] + + rows = [ + row + for row in table.read_rows( + start_key="doesn't matter", end_key=ROW_KEY_2, retry=retry_read_rows + ) + ] + assert len(rows) == 1 + result = rows[0] + assert result.row_key == ROW_KEY_2 + + assert len(data_api.read_rows.mock_calls) == 1 + assert ( + len(data_api.read_rows.mock_calls[0].args[0].rows.row_ranges) > 0 + ) # not empty row_ranges def test_table_yield_retry_rows():