Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Prevent sending full table scan when retrying #554

Merged
merged 4 commits into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 20 additions & 6 deletions google/cloud/bigtable/row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,15 @@ def cell_values(self, column_family_id, column, max_count=None):


class InvalidReadRowsResponse(RuntimeError):
"""Exception raised to to invalid response data from back-end."""
"""Exception raised to invalid response data from back-end."""


class InvalidChunk(RuntimeError):
"""Exception raised to to invalid chunk data from back-end."""
"""Exception raised to invalid chunk data from back-end."""


class InvalidRetryRequest(RuntimeError):
"""Exception raised when retry request is invalid."""


def _retry_read_rows_exception(exc):
Expand Down Expand Up @@ -486,6 +490,9 @@ def __iter__(self):
if self.state != self.NEW_ROW:
raise ValueError("The row remains partial / is not committed.")
break
except InvalidRetryRequest:
self._cancelled = True
break

for chunk in response.chunks:
if self._cancelled:
Expand Down Expand Up @@ -629,10 +636,11 @@ def build_updated_request(self):
data_messages_v2_pb2.ReadRowsRequest.copy_from(resume_request, self.message)

if self.message.rows_limit != 0:
# TODO: Throw an error if rows_limit - read_so_far is 0 or negative.
resume_request.rows_limit = max(
1, self.message.rows_limit - self.rows_read_so_far
)
row_limit_remaining = self.message.rows_limit - self.rows_read_so_far
if row_limit_remaining > 0:
resume_request.rows_limit = row_limit_remaining
else:
raise InvalidRetryRequest

# if neither RowSet.row_keys nor RowSet.row_ranges currently exist,
# add row_range that starts with last_scanned_key as start_key_open
Expand All @@ -643,6 +651,12 @@ def build_updated_request(self):
else:
row_keys = self._filter_rows_keys()
row_ranges = self._filter_row_ranges()

if len(row_keys) == 0 and len(row_ranges) == 0:
# Avoid sending empty row_keys and row_ranges
# if that was not the intention
raise InvalidRetryRequest

resume_request.rows = data_v2_pb2.RowSet(
row_keys=row_keys, row_ranges=row_ranges
)
Expand Down
82 changes: 82 additions & 0 deletions tests/unit/test_row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,18 @@ def test_RRRM__filter_row_key():
assert expected_row_keys == row_keys


def test_RRRM__filter_row_key_is_empty():
table_name = "table_name"
request = _ReadRowsRequestPB(table_name=table_name)
request.rows.row_keys.extend([b"row_key1", b"row_key2", b"row_key3", b"row_key4"])

last_scanned_key = b"row_key4"
request_manager = _make_read_rows_request_manager(request, last_scanned_key, 4)
row_keys = request_manager._filter_rows_keys()

assert row_keys == []


def test_RRRM__filter_row_ranges_all_ranges_added_back(rrrm_data):
from google.cloud.bigtable_v2.types import data as data_v2_pb2

Expand Down Expand Up @@ -1036,6 +1048,76 @@ def test_RRRM__key_already_read():
assert not request_manager._key_already_read(b"row_key16")


def test_RRRM__rows_limit_reached():
from google.cloud.bigtable.row_data import InvalidRetryRequest

last_scanned_key = b"row_key14"
request = _ReadRowsRequestPB(table_name=TABLE_NAME)
request.rows_limit = 2
request_manager = _make_read_rows_request_manager(
request, last_scanned_key=last_scanned_key, rows_read_so_far=2
)
with pytest.raises(InvalidRetryRequest):
request_manager.build_updated_request()


def test_RRRM_build_updated_request_last_row_read_raises_invalid_retry_request():
from google.cloud.bigtable.row_data import InvalidRetryRequest

last_scanned_key = b"row_key4"
request = _ReadRowsRequestPB(table_name=TABLE_NAME)
request.rows.row_keys.extend([b"row_key1", b"row_key2", b"row_key4"])

request_manager = _make_read_rows_request_manager(
request, last_scanned_key, rows_read_so_far=3
)
with pytest.raises(InvalidRetryRequest):
request_manager.build_updated_request()


def test_RRRM_build_updated_request_row_ranges_read_raises_invalid_retry_request():
from google.cloud.bigtable.row_data import InvalidRetryRequest
from google.cloud.bigtable import row_set

row_range1 = row_set.RowRange(b"row_key21", b"row_key29")

request = _ReadRowsRequestPB(table_name=TABLE_NAME)
request.rows.row_ranges.append(row_range1.get_range_kwargs())

last_scanned_key = b"row_key4"
request = _ReadRowsRequestPB(
table_name=TABLE_NAME,
)
request.rows.row_ranges.append(row_range1.get_range_kwargs())

request_manager = _make_read_rows_request_manager(
request, last_scanned_key, rows_read_so_far=2
)
with pytest.raises(InvalidRetryRequest):
request_manager.build_updated_request()


def test_RRRM_build_updated_request_row_ranges_valid():
from google.cloud.bigtable import row_set

row_range1 = row_set.RowRange(b"row_key21", b"row_key29")

request = _ReadRowsRequestPB(table_name=TABLE_NAME)
request.rows.row_ranges.append(row_range1.get_range_kwargs())

last_scanned_key = b"row_key21"
request = _ReadRowsRequestPB(
table_name=TABLE_NAME,
)
request.rows.row_ranges.append(row_range1.get_range_kwargs())

request_manager = _make_read_rows_request_manager(
request, last_scanned_key, rows_read_so_far=1
)
updated_request = request_manager.build_updated_request()
assert len(updated_request.rows.row_ranges) > 0


@pytest.fixture(scope="session")
def json_tests():
dirname = os.path.dirname(__file__)
Expand Down
54 changes: 48 additions & 6 deletions tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,6 @@ def mock_create_row_request(table_name, **kwargs):

def test_table_read_retry_rows():
from google.api_core import retry
from google.cloud.bigtable.table import _create_row_request

credentials = _make_credentials()
client = _make_client(project="project-id", credentials=credentials, admin=True)
Expand Down Expand Up @@ -965,12 +964,55 @@ def test_table_read_retry_rows():
result = rows[1]
assert result.row_key == ROW_KEY_2

expected_request = _create_row_request(
table.name,
start_key=ROW_KEY_1,
end_key=ROW_KEY_2,
assert len(data_api.read_rows.mock_calls) == 3


def test_table_read_retry_rows_no_full_table_scan():
from google.api_core import retry

credentials = _make_credentials()
client = _make_client(project="project-id", credentials=credentials, admin=True)
data_api = client._table_data_client = _make_data_api()
instance = client.instance(instance_id=INSTANCE_ID)
table = _make_table(TABLE_ID, instance)

retry_read_rows = retry.Retry(predicate=_read_rows_retry_exception)

# Create response_iterator
chunk_1 = _ReadRowsResponseCellChunkPB(
row_key=ROW_KEY_2,
family_name=FAMILY_NAME,
qualifier=QUALIFIER,
timestamp_micros=TIMESTAMP_MICROS,
value=VALUE,
commit_row=True,
)
data_api.read_rows.mock_calls = [expected_request] * 3

response_1 = _ReadRowsResponseV2([chunk_1])
response_failure_iterator_2 = _MockFailureIterator_2([response_1])

data_api.table_path.return_value = (
f"projects/{PROJECT_ID}/instances/{INSTANCE_ID}/tables/{TABLE_ID}"
)

data_api.read_rows.side_effect = [
response_failure_iterator_2,
]

rows = [
row
for row in table.read_rows(
start_key="doesn't matter", end_key=ROW_KEY_2, retry=retry_read_rows
)
]
assert len(rows) == 1
result = rows[0]
assert result.row_key == ROW_KEY_2

assert len(data_api.read_rows.mock_calls) == 1
assert (
len(data_api.read_rows.mock_calls[0].args[0].rows.row_ranges) > 0
) # not empty row_ranges


def test_table_yield_retry_rows():
Expand Down