Skip to content

Commit

Permalink
fix: mutations batcher race condition (#896)
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche committed Dec 12, 2023
1 parent 4f050aa commit fe58f61
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 52 deletions.
118 changes: 67 additions & 51 deletions google/cloud/bigtable/batcher.py
Expand Up @@ -53,12 +53,19 @@ def __init__(self, max_mutation_bytes=MAX_MUTATION_SIZE, flush_count=FLUSH_COUNT
self.flush_count = flush_count

def get(self):
"""Retrieve an item from the queue. Recalculate queue size."""
row = self._queue.get()
mutation_size = row.get_mutations_size()
self.total_mutation_count -= len(row._get_mutations())
self.total_size -= mutation_size
return row
"""
Retrieve an item from the queue. Recalculate queue size.
If the queue is empty, return None.
"""
try:
row = self._queue.get_nowait()
mutation_size = row.get_mutations_size()
self.total_mutation_count -= len(row._get_mutations())
self.total_size -= mutation_size
return row
except queue.Empty:
return None

def put(self, item):
"""Insert an item to the queue. Recalculate queue size."""
Expand All @@ -79,9 +86,6 @@ def full(self):
return True
return False

def empty(self):
return self._queue.empty()


@dataclass
class _BatchInfo:
Expand Down Expand Up @@ -292,8 +296,10 @@ def flush(self):
* :exc:`.batcherMutationsBatchError` if there's any error in the mutations.
"""
rows_to_flush = []
while not self._rows.empty():
rows_to_flush.append(self._rows.get())
row = self._rows.get()
while row is not None:
rows_to_flush.append(row)
row = self._rows.get()
response = self._flush_rows(rows_to_flush)
return response

Expand All @@ -303,58 +309,68 @@ def _flush_async(self):
:raises:
* :exc:`.batcherMutationsBatchError` if there's any error in the mutations.
"""

rows_to_flush = []
mutations_count = 0
mutations_size = 0
rows_count = 0
batch_info = _BatchInfo()

while not self._rows.empty():
row = self._rows.get()
mutations_count += len(row._get_mutations())
mutations_size += row.get_mutations_size()
rows_count += 1
rows_to_flush.append(row)
batch_info.mutations_count = mutations_count
batch_info.rows_count = rows_count
batch_info.mutations_size = mutations_size

if (
rows_count >= self.flush_count
or mutations_size >= self.max_row_bytes
or mutations_count >= self.flow_control.max_mutations
or mutations_size >= self.flow_control.max_mutation_bytes
or self._rows.empty() # submit when it reached the end of the queue
next_row = self._rows.get()
while next_row is not None:
# start a new batch
rows_to_flush = [next_row]
batch_info = _BatchInfo(
mutations_count=len(next_row._get_mutations()),
rows_count=1,
mutations_size=next_row.get_mutations_size(),
)
# fill up batch with rows
next_row = self._rows.get()
while next_row is not None and self._row_fits_in_batch(
next_row, batch_info
):
# wait for resources to become available, before submitting any new batch
self.flow_control.wait()
# once unblocked, submit a batch
# event flag will be set by control_flow to block subsequent thread, but not blocking this one
self.flow_control.control_flow(batch_info)
future = self._executor.submit(self._flush_rows, rows_to_flush)
self.futures_mapping[future] = batch_info
future.add_done_callback(self._batch_completed_callback)

# reset and start a new batch
rows_to_flush = []
mutations_size = 0
rows_count = 0
mutations_count = 0
batch_info = _BatchInfo()
rows_to_flush.append(next_row)
batch_info.mutations_count += len(next_row._get_mutations())
batch_info.rows_count += 1
batch_info.mutations_size += next_row.get_mutations_size()
next_row = self._rows.get()
# send batch over network
# wait for resources to become available
self.flow_control.wait()
# once unblocked, submit the batch
# event flag will be set by control_flow to block subsequent thread, but not blocking this one
self.flow_control.control_flow(batch_info)
future = self._executor.submit(self._flush_rows, rows_to_flush)
# schedule release of resources from flow control
self.futures_mapping[future] = batch_info
future.add_done_callback(self._batch_completed_callback)

def _batch_completed_callback(self, future):
"""Callback for when the mutation has finished to clean up the current batch
and release items from the flow controller.
Raise exceptions if there's any.
Release the resources locked by the flow control and allow enqueued tasks to be run.
"""

processed_rows = self.futures_mapping[future]
self.flow_control.release(processed_rows)
del self.futures_mapping[future]

def _row_fits_in_batch(self, row, batch_info):
"""Checks if a row can fit in the current batch.
:type row: class
:param row: :class:`~google.cloud.bigtable.row.DirectRow`.
:type batch_info: :class:`_BatchInfo`
:param batch_info: Information about the current batch.
:rtype: bool
:returns: True if the row can fit in the current batch.
"""
new_rows_count = batch_info.rows_count + 1
new_mutations_count = batch_info.mutations_count + len(row._get_mutations())
new_mutations_size = batch_info.mutations_size + row.get_mutations_size()
return (
new_rows_count <= self.flush_count
and new_mutations_size <= self.max_row_bytes
and new_mutations_count <= self.flow_control.max_mutations
and new_mutations_size <= self.flow_control.max_mutation_bytes
)

def _flush_rows(self, rows_to_flush):
"""Mutate the specified rows.
Expand Down
2 changes: 1 addition & 1 deletion tests/system/conftest.py
Expand Up @@ -58,7 +58,7 @@ def location_id():

@pytest.fixture(scope="session")
def serve_nodes():
return 3
return 1


@pytest.fixture(scope="session")
Expand Down
36 changes: 36 additions & 0 deletions tests/system/test_data_api.py
Expand Up @@ -381,3 +381,39 @@ def test_access_with_non_admin_client(data_client, data_instance_id, data_table_
instance = data_client.instance(data_instance_id)
table = instance.table(data_table_id)
assert table.read_row("nonesuch") is None # no raise


def test_mutations_batcher_threading(data_table, rows_to_delete):
"""
Test the mutations batcher by sending a bunch of mutations using different
flush methods
"""
import mock
import time
from google.cloud.bigtable.batcher import MutationsBatcher

num_sent = 20
all_results = []

def callback(results):
all_results.extend(results)

# override flow control max elements
with mock.patch("google.cloud.bigtable.batcher.MAX_OUTSTANDING_ELEMENTS", 2):
with MutationsBatcher(
data_table,
flush_count=5,
flush_interval=0.07,
batch_completed_callback=callback,
) as batcher:
# send mutations in a way that timed flushes and count flushes interleave
for i in range(num_sent):
row = data_table.direct_row("row{}".format(i))
row.set_cell(
COLUMN_FAMILY_ID1, COL_NAME1, "val{}".format(i).encode("utf-8")
)
rows_to_delete.append(row)
batcher.mutate(row)
time.sleep(0.01)
# ensure all mutations were sent
assert len(all_results) == num_sent

0 comments on commit fe58f61

Please sign in to comment.