Skip to content

Commit

Permalink
fix: Query options were not respecting use_cache (#873)
Browse files Browse the repository at this point in the history
In certain circumstances, we were not respecting use_cache
for queries, unlike legacy NDB, which is quite emphatic
about supporting them.
(See https://github.com/GoogleCloudPlatform/datastore-ndb-python/blob/59cb209ed95480025d26531fc91397575438d2fe/ndb/query.py#L186-L187)

In #613 we tried to match legacy NDB behavior by updating the cache
using the results of queries. We still do that, but now we respect
use_cache, which was a valid keyword argument for Query.fetch()
and friends, but was not passed down to the context cache when
needed.

As a result, the cache could mysteriously accumulate lots of memory
usage and perhaps even cause you to hit memory limits, even if it was
seemingly disabled and it didn't look like there were any objects
holding references to your query results.
This is a problem for certain batch-style workloads where you know
you're only interested in processing a certain entity once.

Fixes #752
  • Loading branch information
rwhogg committed Feb 27, 2023
1 parent 982ee5f commit 802d88d
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 4 deletions.
12 changes: 8 additions & 4 deletions google/cloud/ndb/_datastore_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ def _next_batch(self):
self._start_cursor = query.start_cursor
self._index = 0
self._batch = [
_Result(result_type, result_pb, query.order_by)
_Result(result_type, result_pb, query.order_by, query_options=query)
for result_pb in response.batch.entity_results
]

Expand Down Expand Up @@ -755,17 +755,21 @@ class _Result(object):
order_by (Optional[Sequence[query.PropertyOrder]]): Ordering for the
query. Used to merge sorted result sets while maintaining sort
order.
query_options (Optional[QueryOptions]): Other query_options.
use_cache is the only supported option.
"""

_key = None

def __init__(self, result_type, result_pb, order_by=None):
def __init__(self, result_type, result_pb, order_by=None, query_options=None):
self.result_type = result_type
self.result_pb = result_pb
self.order_by = order_by

self.cursor = Cursor(result_pb.cursor)

self._query_options = query_options

def __lt__(self, other):
"""For total ordering."""
return self._compare(other) == -1
Expand Down Expand Up @@ -854,7 +858,7 @@ def check_cache(self, context):
will cause `None` to be recorded in the cache.
"""
key = self.key()
if context._use_cache(key):
if context._use_cache(key, self._query_options):
try:
return context.cache.get_and_validate(key)
except KeyError:
Expand All @@ -880,7 +884,7 @@ def entity(self):
if entity is _KEY_NOT_IN_CACHE:
# entity not in cache, create one, and then add it to cache
entity = model._entity_from_protobuf(self.result_pb.entity)
if context._use_cache(entity.key):
if context._use_cache(entity.key, self._query_options):
context.cache[entity.key] = entity
return entity

Expand Down
69 changes: 69 additions & 0 deletions tests/system/test_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,36 @@ class SomeKind(ndb.Model):
assert not cache_value


def test_insert_entity_with_use_global_cache_false(dispose_of, client_context):
class SomeKind(ndb.Model):
foo = ndb.IntegerProperty()
bar = ndb.StringProperty()

global_cache = global_cache_module._InProcessGlobalCache()
with client_context.new(global_cache=global_cache).use() as context:
context.set_global_cache_policy(None) # Use default

entity = SomeKind(foo=42, bar="none")
key = entity.put(use_global_cache=False)
dispose_of(key._key)
cache_key = _cache.global_cache_key(key._key)
cache_value = global_cache.get([cache_key])[0]
assert not cache_value

retrieved = key.get(use_global_cache=False)
assert retrieved.foo == 42
assert retrieved.bar == "none"

cache_value = global_cache.get([cache_key])[0]
assert not cache_value

entity.foo = 43
entity.put(use_global_cache=False)

cache_value = global_cache.get([cache_key])[0]
assert not cache_value


@pytest.mark.skipif(not USE_REDIS_CACHE, reason="Redis is not configured")
def test_insert_entity_with_redis_cache(dispose_of, redis_context):
class SomeKind(ndb.Model):
Expand Down Expand Up @@ -1873,3 +1903,42 @@ class SomeKind(ndb.Model):
dispose_of(key._key)

assert key.get().sub_model.data["test"] == 1


def test_put_updates_cache(client_context, dispose_of):
class SomeKind(ndb.Model):
foo = ndb.IntegerProperty()

client_context.set_cache_policy(None) # Use default

entity = SomeKind(foo=42)
key = entity.put()
assert len(client_context.cache) == 1
dispose_of(key._key)


def test_put_with_use_cache_true_updates_cache(client_context, dispose_of):
class SomeKind(ndb.Model):
foo = ndb.IntegerProperty()

client_context.set_cache_policy(None) # Use default

entity = SomeKind(foo=42)
key = entity.put(use_cache=True)
assert len(client_context.cache) == 1
assert client_context.cache[key] is entity

dispose_of(key._key)


def test_put_with_use_cache_false_does_not_update_cache(client_context, dispose_of):
class SomeKind(ndb.Model):
foo = ndb.IntegerProperty()

client_context.set_cache_policy(None) # Use default

entity = SomeKind(foo=42)
key = entity.put(use_cache=False)
assert len(client_context.cache) == 0

dispose_of(key._key)
26 changes: 26 additions & 0 deletions tests/system/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -2010,3 +2010,29 @@ class SomeKind(ndb.Model):

# If there is a cache hit, we'll get back the same object, not just a copy
assert key.get() is retrieved


def test_query_with_explicit_use_cache_updates_cache(dispose_of, client_context):
class SomeKind(ndb.Model):
foo = ndb.IntegerProperty()

entity = SomeKind(foo=42)
key = entity.put(use_cache=False)
dispose_of(key._key)
assert len(client_context.cache) == 0

eventually(lambda: SomeKind.query().fetch(use_cache=True), length_equals(1))
assert len(client_context.cache) == 1


def test_query_with_use_cache_false_does_not_update_cache(dispose_of, client_context):
class SomeKind(ndb.Model):
foo = ndb.IntegerProperty()

entity = SomeKind(foo=42)
key = entity.put(use_cache=False)
dispose_of(key._key)
assert len(client_context.cache) == 0

eventually(lambda: SomeKind.query().fetch(use_cache=False), length_equals(1))
assert len(client_context.cache) == 0
82 changes: 82 additions & 0 deletions tests/unit/test__datastore_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1500,6 +1500,31 @@ def probably_has_next(self):


class Test_Result:
@staticmethod
def test_constructor_defaults():
result = _datastore_query._Result(
result_type=None,
result_pb=query_pb2.EntityResult(),
)
assert result.order_by is None
assert result._query_options is None

@staticmethod
def test_constructor_order_by():
order = query_module.PropertyOrder("foo")
result = _datastore_query._Result(
result_type=None, result_pb=query_pb2.EntityResult(), order_by=[order]
)
assert result.order_by == [order]

@staticmethod
def test_constructor_query_options():
options = query_module.QueryOptions(use_cache=False)
result = _datastore_query._Result(
result_type=None, result_pb=query_pb2.EntityResult(), query_options=options
)
assert result._query_options == options

@staticmethod
def test_total_ordering():
def result(foo, bar=0, baz=""):
Expand Down Expand Up @@ -1660,9 +1685,15 @@ def test_entity_full_entity(model):
mock.Mock(entity=entity_pb, cursor=b"123", spec=("entity", "cursor")),
)

context = context_module.get_context()

assert len(context.cache) == 0
assert result.entity() is entity
model._entity_from_protobuf.assert_called_once_with(entity_pb)

# Regression test for #752: ensure cache is updated after querying
assert len(context.cache) == 1

@staticmethod
@pytest.mark.usefixtures("in_context")
@mock.patch("google.cloud.ndb._datastore_query.model")
Expand Down Expand Up @@ -1703,6 +1734,57 @@ def test_entity_full_entity_no_cache(model):
)
assert result.entity() is entity

# Regression test for #752: ensure cache does not grow (i.e. use up memory)
assert len(context.cache) == 0

@staticmethod
@pytest.mark.usefixtures("in_context")
@mock.patch("google.cloud.ndb._datastore_query.model")
def test_entity_full_entity_no_cache_via_cache_options(model):
context = context_module.get_context()
with context.new().use():
key_pb = entity_pb2.Key(
partition_id=entity_pb2.PartitionId(project_id="testing"),
path=[entity_pb2.Key.PathElement(kind="ThisKind", id=42)],
)
entity = mock.Mock(key=key_pb)
model._entity_from_protobuf.return_value = entity
result = _datastore_query._Result(
_datastore_query.RESULT_TYPE_FULL,
mock.Mock(entity=entity, cursor=b"123", spec=("entity", "cursor")),
query_options=query_module.QueryOptions(use_cache=False),
)
assert result.entity() is entity

# Regression test for #752: ensure cache does not grow (i.e. use up memory)
assert len(context.cache) == 0

@staticmethod
@pytest.mark.usefixtures("in_context")
@mock.patch("google.cloud.ndb._datastore_query.model")
def test_entity_full_entity_cache_options_true(model):
key_pb = entity_pb2.Key(
partition_id=entity_pb2.PartitionId(project_id="testing"),
path=[entity_pb2.Key.PathElement(kind="ThisKind", id=42)],
)
entity_pb = mock.Mock(key=key_pb)
entity = mock.Mock(key=key_module.Key("ThisKind", 42))
model._entity_from_protobuf.return_value = entity
result = _datastore_query._Result(
_datastore_query.RESULT_TYPE_FULL,
mock.Mock(entity=entity_pb, cursor=b"123", spec=("entity", "cursor")),
query_options=query_module.QueryOptions(use_cache=True),
)

context = context_module.get_context()

assert len(context.cache) == 0
assert result.entity() is entity
model._entity_from_protobuf.assert_called_once_with(entity_pb)

# Regression test for #752: ensure cache is updated after querying
assert len(context.cache) == 1

@staticmethod
@pytest.mark.usefixtures("in_context")
def test_entity_key_only():
Expand Down

0 comments on commit 802d88d

Please sign in to comment.