From fe58f617c7364d7e99e2ec50abd5f080852bf033 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Tue, 12 Dec 2023 13:58:06 -0800 Subject: [PATCH] fix: mutations batcher race condition (#896) --- google/cloud/bigtable/batcher.py | 118 ++++++++++++++++++------------- tests/system/conftest.py | 2 +- tests/system/test_data_api.py | 36 ++++++++++ 3 files changed, 104 insertions(+), 52 deletions(-) diff --git a/google/cloud/bigtable/batcher.py b/google/cloud/bigtable/batcher.py index a6eb806e9..8f0cabadd 100644 --- a/google/cloud/bigtable/batcher.py +++ b/google/cloud/bigtable/batcher.py @@ -53,12 +53,19 @@ def __init__(self, max_mutation_bytes=MAX_MUTATION_SIZE, flush_count=FLUSH_COUNT self.flush_count = flush_count def get(self): - """Retrieve an item from the queue. Recalculate queue size.""" - row = self._queue.get() - mutation_size = row.get_mutations_size() - self.total_mutation_count -= len(row._get_mutations()) - self.total_size -= mutation_size - return row + """ + Retrieve an item from the queue. Recalculate queue size. + + If the queue is empty, return None. + """ + try: + row = self._queue.get_nowait() + mutation_size = row.get_mutations_size() + self.total_mutation_count -= len(row._get_mutations()) + self.total_size -= mutation_size + return row + except queue.Empty: + return None def put(self, item): """Insert an item to the queue. Recalculate queue size.""" @@ -79,9 +86,6 @@ def full(self): return True return False - def empty(self): - return self._queue.empty() - @dataclass class _BatchInfo: @@ -292,8 +296,10 @@ def flush(self): * :exc:`.batcherMutationsBatchError` if there's any error in the mutations. """ rows_to_flush = [] - while not self._rows.empty(): - rows_to_flush.append(self._rows.get()) + row = self._rows.get() + while row is not None: + rows_to_flush.append(row) + row = self._rows.get() response = self._flush_rows(rows_to_flush) return response @@ -303,58 +309,68 @@ def _flush_async(self): :raises: * :exc:`.batcherMutationsBatchError` if there's any error in the mutations. """ - - rows_to_flush = [] - mutations_count = 0 - mutations_size = 0 - rows_count = 0 - batch_info = _BatchInfo() - - while not self._rows.empty(): - row = self._rows.get() - mutations_count += len(row._get_mutations()) - mutations_size += row.get_mutations_size() - rows_count += 1 - rows_to_flush.append(row) - batch_info.mutations_count = mutations_count - batch_info.rows_count = rows_count - batch_info.mutations_size = mutations_size - - if ( - rows_count >= self.flush_count - or mutations_size >= self.max_row_bytes - or mutations_count >= self.flow_control.max_mutations - or mutations_size >= self.flow_control.max_mutation_bytes - or self._rows.empty() # submit when it reached the end of the queue + next_row = self._rows.get() + while next_row is not None: + # start a new batch + rows_to_flush = [next_row] + batch_info = _BatchInfo( + mutations_count=len(next_row._get_mutations()), + rows_count=1, + mutations_size=next_row.get_mutations_size(), + ) + # fill up batch with rows + next_row = self._rows.get() + while next_row is not None and self._row_fits_in_batch( + next_row, batch_info ): - # wait for resources to become available, before submitting any new batch - self.flow_control.wait() - # once unblocked, submit a batch - # event flag will be set by control_flow to block subsequent thread, but not blocking this one - self.flow_control.control_flow(batch_info) - future = self._executor.submit(self._flush_rows, rows_to_flush) - self.futures_mapping[future] = batch_info - future.add_done_callback(self._batch_completed_callback) - - # reset and start a new batch - rows_to_flush = [] - mutations_size = 0 - rows_count = 0 - mutations_count = 0 - batch_info = _BatchInfo() + rows_to_flush.append(next_row) + batch_info.mutations_count += len(next_row._get_mutations()) + batch_info.rows_count += 1 + batch_info.mutations_size += next_row.get_mutations_size() + next_row = self._rows.get() + # send batch over network + # wait for resources to become available + self.flow_control.wait() + # once unblocked, submit the batch + # event flag will be set by control_flow to block subsequent thread, but not blocking this one + self.flow_control.control_flow(batch_info) + future = self._executor.submit(self._flush_rows, rows_to_flush) + # schedule release of resources from flow control + self.futures_mapping[future] = batch_info + future.add_done_callback(self._batch_completed_callback) def _batch_completed_callback(self, future): """Callback for when the mutation has finished to clean up the current batch and release items from the flow controller. - Raise exceptions if there's any. Release the resources locked by the flow control and allow enqueued tasks to be run. """ - processed_rows = self.futures_mapping[future] self.flow_control.release(processed_rows) del self.futures_mapping[future] + def _row_fits_in_batch(self, row, batch_info): + """Checks if a row can fit in the current batch. + + :type row: class + :param row: :class:`~google.cloud.bigtable.row.DirectRow`. + + :type batch_info: :class:`_BatchInfo` + :param batch_info: Information about the current batch. + + :rtype: bool + :returns: True if the row can fit in the current batch. + """ + new_rows_count = batch_info.rows_count + 1 + new_mutations_count = batch_info.mutations_count + len(row._get_mutations()) + new_mutations_size = batch_info.mutations_size + row.get_mutations_size() + return ( + new_rows_count <= self.flush_count + and new_mutations_size <= self.max_row_bytes + and new_mutations_count <= self.flow_control.max_mutations + and new_mutations_size <= self.flow_control.max_mutation_bytes + ) + def _flush_rows(self, rows_to_flush): """Mutate the specified rows. diff --git a/tests/system/conftest.py b/tests/system/conftest.py index f39fcba88..910c20970 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -58,7 +58,7 @@ def location_id(): @pytest.fixture(scope="session") def serve_nodes(): - return 3 + return 1 @pytest.fixture(scope="session") diff --git a/tests/system/test_data_api.py b/tests/system/test_data_api.py index 2ca7e1504..579837e34 100644 --- a/tests/system/test_data_api.py +++ b/tests/system/test_data_api.py @@ -381,3 +381,39 @@ def test_access_with_non_admin_client(data_client, data_instance_id, data_table_ instance = data_client.instance(data_instance_id) table = instance.table(data_table_id) assert table.read_row("nonesuch") is None # no raise + + +def test_mutations_batcher_threading(data_table, rows_to_delete): + """ + Test the mutations batcher by sending a bunch of mutations using different + flush methods + """ + import mock + import time + from google.cloud.bigtable.batcher import MutationsBatcher + + num_sent = 20 + all_results = [] + + def callback(results): + all_results.extend(results) + + # override flow control max elements + with mock.patch("google.cloud.bigtable.batcher.MAX_OUTSTANDING_ELEMENTS", 2): + with MutationsBatcher( + data_table, + flush_count=5, + flush_interval=0.07, + batch_completed_callback=callback, + ) as batcher: + # send mutations in a way that timed flushes and count flushes interleave + for i in range(num_sent): + row = data_table.direct_row("row{}".format(i)) + row.set_cell( + COLUMN_FAMILY_ID1, COL_NAME1, "val{}".format(i).encode("utf-8") + ) + rows_to_delete.append(row) + batcher.mutate(row) + time.sleep(0.01) + # ensure all mutations were sent + assert len(all_results) == num_sent