Skip to content

Commit

Permalink
fix: handle empty last chunk correctly in 'Query._chunkify' (#489)
Browse files Browse the repository at this point in the history
Closes #487.
Supersedes #488.
  • Loading branch information
tseaver committed Nov 15, 2021
1 parent eb8366e commit 3ddc718
Show file tree
Hide file tree
Showing 11 changed files with 298 additions and 120 deletions.
8 changes: 4 additions & 4 deletions google/cloud/firestore_v1/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,22 +331,22 @@ async def recursive_delete(
The BulkWriter used to delete all matching documents. Supply this
if you want to override the default throttling behavior.
"""
if bulk_writer is None:
bulk_writer = self.bulk_writer()

return await self._recursive_delete(
reference, bulk_writer=bulk_writer, chunk_size=chunk_size,
)

async def _recursive_delete(
self,
reference: Union[AsyncCollectionReference, AsyncDocumentReference],
bulk_writer: "BulkWriter",
*,
bulk_writer: Optional["BulkWriter"] = None, # type: ignore
chunk_size: Optional[int] = 5000,
depth: Optional[int] = 0,
) -> int:
"""Recursion helper for `recursive_delete."""
from google.cloud.firestore_v1.bulk_writer import BulkWriter

bulk_writer = bulk_writer or BulkWriter()

num_deleted: int = 0

Expand Down
13 changes: 5 additions & 8 deletions google/cloud/firestore_v1/async_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,6 @@ def __init__(
async def _chunkify(
self, chunk_size: int
) -> AsyncGenerator[List[DocumentSnapshot], None]:
# Catch the edge case where a developer writes the following:
# `my_query.limit(500)._chunkify(1000)`, which ultimately nullifies any
# need to yield chunks.
if self._limit and chunk_size > self._limit:
yield await self.get()
return

max_to_return: Optional[int] = self._limit
num_returned: int = 0
original: AsyncQuery = self._copy()
Expand All @@ -150,11 +143,15 @@ async def _chunkify(
# Apply the optionally pruned limit and the cursor, if we are past
# the first page.
_q = original.limit(_chunk_size)

if last_document:
_q = _q.start_after(last_document)

snapshots = await _q.get()
last_document = snapshots[-1]

if snapshots:
last_document = snapshots[-1]

num_returned += len(snapshots)

yield snapshots
Expand Down
7 changes: 6 additions & 1 deletion google/cloud/firestore_v1/bulk_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ class BulkWriter(AsyncBulkWriterMixin):

def __init__(
self,
client: Optional["BaseClient"] = None,
client: "BaseClient" = None,
options: Optional["BulkWriterOptions"] = None,
):
# Because `BulkWriter` instances are all synchronous/blocking on the
Expand Down Expand Up @@ -895,6 +895,11 @@ def __init__(
self.mode = mode
self.retry = retry

def __eq__(self, other):
if not isinstance(other, self.__class__): # pragma: NO COVER
return NotImplemented
return self.__dict__ == other.__dict__

class BulkWriteFailure:
def __init__(
self,
Expand Down
18 changes: 6 additions & 12 deletions google/cloud/firestore_v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
# Types needed only for Type Hints
from google.cloud.firestore_v1.base_document import DocumentSnapshot


if TYPE_CHECKING:
from google.cloud.firestore_v1.bulk_writer import BulkWriter # pragma: NO COVER

Expand Down Expand Up @@ -319,22 +318,20 @@ def recursive_delete(
if you want to override the default throttling behavior.
"""
return self._recursive_delete(
reference, bulk_writer=bulk_writer, chunk_size=chunk_size,
)
if bulk_writer is None:
bulk_writer or self.bulk_writer()

return self._recursive_delete(reference, bulk_writer, chunk_size=chunk_size,)

def _recursive_delete(
self,
reference: Union[CollectionReference, DocumentReference],
bulk_writer: "BulkWriter",
*,
bulk_writer: Optional["BulkWriter"] = None,
chunk_size: Optional[int] = 5000,
depth: Optional[int] = 0,
) -> int:
"""Recursion helper for `recursive_delete."""
from google.cloud.firestore_v1.bulk_writer import BulkWriter

bulk_writer = bulk_writer or BulkWriter()

num_deleted: int = 0

Expand All @@ -354,10 +351,7 @@ def _recursive_delete(
col_ref: CollectionReference
for col_ref in reference.collections():
num_deleted += self._recursive_delete(
col_ref,
bulk_writer=bulk_writer,
chunk_size=chunk_size,
depth=depth + 1,
col_ref, bulk_writer, chunk_size=chunk_size, depth=depth + 1,
)
num_deleted += 1
bulk_writer.delete(reference)
Expand Down
12 changes: 5 additions & 7 deletions google/cloud/firestore_v1/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,6 @@ def get(
def _chunkify(
self, chunk_size: int
) -> Generator[List[DocumentSnapshot], None, None]:
# Catch the edge case where a developer writes the following:
# `my_query.limit(500)._chunkify(1000)`, which ultimately nullifies any
# need to yield chunks.
if self._limit and chunk_size > self._limit:
yield self.get()
return

max_to_return: Optional[int] = self._limit
num_returned: int = 0
Expand All @@ -191,11 +185,15 @@ def _chunkify(
# Apply the optionally pruned limit and the cursor, if we are past
# the first page.
_q = original.limit(_chunk_size)

if last_document:
_q = _q.start_after(last_document)

snapshots = _q.get()
last_document = snapshots[-1]

if snapshots:
last_document = snapshots[-1]

num_returned += len(snapshots)

yield snapshots
Expand Down
47 changes: 33 additions & 14 deletions tests/system/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -1325,19 +1325,24 @@ def _persist_documents(
]


def _do_recursive_delete_with_bulk_writer(client, bulk_writer):
philosophers = [philosophers_data_set[0]]
_persist_documents(client, f"philosophers{UNIQUE_RESOURCE_ID}", philosophers)
def _do_recursive_delete(client, bulk_writer, empty_philosophers=False):

doc_paths = [
"",
"/pets/Scruffy",
"/pets/Snowflake",
"/hobbies/pontificating",
"/hobbies/journaling",
"/philosophers/Aristotle",
"/philosophers/Plato",
]
if empty_philosophers:
doc_paths = philosophers = []
else:
philosophers = [philosophers_data_set[0]]

doc_paths = [
"",
"/pets/Scruffy",
"/pets/Snowflake",
"/hobbies/pontificating",
"/hobbies/journaling",
"/philosophers/Aristotle",
"/philosophers/Plato",
]

_persist_documents(client, f"philosophers{UNIQUE_RESOURCE_ID}", philosophers)

# Assert all documents were created so that when they're missing after the
# delete, we're actually testing something.
Expand All @@ -1362,14 +1367,28 @@ def test_recursive_delete_parallelized(client, cleanup):
from google.cloud.firestore_v1.bulk_writer import BulkWriterOptions, SendMode

bw = client.bulk_writer(options=BulkWriterOptions(mode=SendMode.parallel))
_do_recursive_delete_with_bulk_writer(client, bw)
_do_recursive_delete(client, bw)


def test_recursive_delete_serialized(client, cleanup):
from google.cloud.firestore_v1.bulk_writer import BulkWriterOptions, SendMode

bw = client.bulk_writer(options=BulkWriterOptions(mode=SendMode.serial))
_do_recursive_delete_with_bulk_writer(client, bw)
_do_recursive_delete(client, bw)


def test_recursive_delete_parallelized_empty(client, cleanup):
from google.cloud.firestore_v1.bulk_writer import BulkWriterOptions, SendMode

bw = client.bulk_writer(options=BulkWriterOptions(mode=SendMode.parallel))
_do_recursive_delete(client, bw, empty_philosophers=True)


def test_recursive_delete_serialized_empty(client, cleanup):
from google.cloud.firestore_v1.bulk_writer import BulkWriterOptions, SendMode

bw = client.bulk_writer(options=BulkWriterOptions(mode=SendMode.serial))
_do_recursive_delete(client, bw, empty_philosophers=True)


def test_recursive_query(client, cleanup):
Expand Down
46 changes: 32 additions & 14 deletions tests/system/test_system_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -1184,22 +1184,26 @@ async def _persist_documents(
]


async def _do_recursive_delete_with_bulk_writer(client, bulk_writer):
philosophers = [philosophers_data_set[0]]
async def _do_recursive_delete(client, bulk_writer, empty_philosophers=False):

if empty_philosophers:
philosophers = doc_paths = []
else:
philosophers = [philosophers_data_set[0]]
doc_paths = [
"",
"/pets/Scruffy",
"/pets/Snowflake",
"/hobbies/pontificating",
"/hobbies/journaling",
"/philosophers/Aristotle",
"/philosophers/Plato",
]

await _persist_documents(
client, f"philosophers-async{UNIQUE_RESOURCE_ID}", philosophers
)

doc_paths = [
"",
"/pets/Scruffy",
"/pets/Snowflake",
"/hobbies/pontificating",
"/hobbies/journaling",
"/philosophers/Aristotle",
"/philosophers/Plato",
]

# Assert all documents were created so that when they're missing after the
# delete, we're actually testing something.
collection_ref = client.collection(f"philosophers-async{UNIQUE_RESOURCE_ID}")
Expand All @@ -1223,14 +1227,28 @@ async def test_async_recursive_delete_parallelized(client, cleanup):
from google.cloud.firestore_v1.bulk_writer import BulkWriterOptions, SendMode

bw = client.bulk_writer(options=BulkWriterOptions(mode=SendMode.parallel))
await _do_recursive_delete_with_bulk_writer(client, bw)
await _do_recursive_delete(client, bw)


async def test_async_recursive_delete_serialized(client, cleanup):
from google.cloud.firestore_v1.bulk_writer import BulkWriterOptions, SendMode

bw = client.bulk_writer(options=BulkWriterOptions(mode=SendMode.serial))
await _do_recursive_delete_with_bulk_writer(client, bw)
await _do_recursive_delete(client, bw)


async def test_async_recursive_delete_parallelized_empty(client, cleanup):
from google.cloud.firestore_v1.bulk_writer import BulkWriterOptions, SendMode

bw = client.bulk_writer(options=BulkWriterOptions(mode=SendMode.parallel))
await _do_recursive_delete(client, bw, empty_philosophers=True)


async def test_async_recursive_delete_serialized_empty(client, cleanup):
from google.cloud.firestore_v1.bulk_writer import BulkWriterOptions, SendMode

bw = client.bulk_writer(options=BulkWriterOptions(mode=SendMode.serial))
await _do_recursive_delete(client, bw, empty_philosophers=True)


async def test_recursive_query(client, cleanup):
Expand Down
Loading

0 comments on commit 3ddc718

Please sign in to comment.