Skip to content

Commit

Permalink
Implement use_datastore flag. (#155)
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Rossi committed Aug 14, 2019
1 parent 819ce4f commit 9883379
Show file tree
Hide file tree
Showing 8 changed files with 299 additions and 115 deletions.
93 changes: 60 additions & 33 deletions src/google/cloud/ndb/_datastore_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,19 @@ def lookup(key, options):
either an entity protocol buffer or _NOT_FOUND.
"""
context = context_module.get_context()
use_global_cache = context._use_global_cache(key, options)
use_datastore = context._use_datastore(key, options)
in_transaction = bool(_get_transaction(options))
if use_datastore and in_transaction:
use_global_cache = False
else:
use_global_cache = context._use_global_cache(key, options)

if not (use_global_cache or use_datastore):
raise TypeError(
"use_global_cache and use_datastore can't both be False"
)

entity_pb = None
entity_pb = _NOT_FOUND
key_locked = False

if use_global_cache:
Expand All @@ -150,20 +160,21 @@ def lookup(key, options):
entity_pb = entity_pb2.Entity()
entity_pb.MergeFromString(result)

else:
elif use_datastore:
yield _cache.global_lock(cache_key)
yield _cache.global_watch(cache_key)

if entity_pb is None:
if entity_pb is _NOT_FOUND and use_datastore:
batch = _batch.get_batch(_LookupBatch, options)
entity_pb = yield batch.add(key)

if use_global_cache and not key_locked and entity_pb is not _NOT_FOUND:
expires = context._global_cache_timeout(key, options)
serialized = entity_pb.SerializeToString()
yield _cache.global_compare_and_swap(
cache_key, serialized, expires=expires
)
# Do not cache misses
if use_global_cache and not key_locked and entity_pb is not _NOT_FOUND:
expires = context._global_cache_timeout(key, options)
serialized = entity_pb.SerializeToString()
yield _cache.global_compare_and_swap(
cache_key, serialized, expires=expires
)

return entity_pb

Expand Down Expand Up @@ -368,27 +379,39 @@ def put(entity, options):
"""
context = context_module.get_context()
use_global_cache = context._use_global_cache(entity.key, options)
use_datastore = context._use_datastore(entity.key, options)
if not (use_global_cache or use_datastore):
raise TypeError(
"use_global_cache and use_datastore can't both be False"
)

entity_pb = helpers.entity_to_protobuf(entity)
cache_key = _cache.global_cache_key(entity.key)
if use_global_cache and not entity.key.is_partial:
yield _cache.global_lock(cache_key)

transaction = _get_transaction(options)
if transaction:
batch = _get_commit_batch(transaction, options)
else:
batch = _batch.get_batch(_NonTransactionalCommitBatch, options)
if use_datastore:
yield _cache.global_lock(cache_key)
else:
expires = context._global_cache_timeout(entity.key, options)
cache_value = entity_pb.SerializeToString()
yield _cache.global_set(cache_key, cache_value, expires=expires)

if use_datastore:
transaction = _get_transaction(options)
if transaction:
batch = _get_commit_batch(transaction, options)
else:
batch = _batch.get_batch(_NonTransactionalCommitBatch, options)

entity_pb = helpers.entity_to_protobuf(entity)
key_pb = yield batch.put(entity_pb)
if key_pb:
key = helpers.key_from_protobuf(key_pb)
else:
key = None
key_pb = yield batch.put(entity_pb)
if key_pb:
key = helpers.key_from_protobuf(key_pb)
else:
key = None

if use_global_cache:
yield _cache.global_delete(cache_key)
if use_global_cache:
yield _cache.global_delete(cache_key)

return key
return key


@tasklets.tasklet
Expand All @@ -408,18 +431,22 @@ def delete(key, options):
"""
context = context_module.get_context()
use_global_cache = context._use_global_cache(key, options)
use_datastore = context._use_datastore(key, options)

if use_global_cache:
cache_key = _cache.global_cache_key(key)
yield _cache.global_lock(cache_key)

transaction = _get_transaction(options)
if transaction:
batch = _get_commit_batch(transaction, options)
else:
batch = _batch.get_batch(_NonTransactionalCommitBatch, options)
if use_datastore:
if use_global_cache:
yield _cache.global_lock(cache_key)

transaction = _get_transaction(options)
if transaction:
batch = _get_commit_batch(transaction, options)
else:
batch = _batch.get_batch(_NonTransactionalCommitBatch, options)

yield batch.delete(key)
yield batch.delete(key)

if use_global_cache:
yield _cache.global_delete(cache_key)
Expand Down
4 changes: 0 additions & 4 deletions src/google/cloud/ndb/_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ class Options:
"use_cache",
"use_global_cache",
"global_cache_timeout",
# Not yet implemented
"use_datastore",
# Might or might not implement
"force_writes",
Expand Down Expand Up @@ -155,9 +154,6 @@ def __init__(self, config=None, **kwargs):
)
)

if self.use_datastore is not None:
raise NotImplementedError

if self.max_memcache_items is not None:
raise NotImplementedError

Expand Down
145 changes: 78 additions & 67 deletions src/google/cloud/ndb/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,66 +65,71 @@ def get_context():
raise exceptions.ContextError()


def _default_cache_policy(key):
"""The default cache policy.
def _default_policy(attr_name, value_type):
"""Factory for producing default policies.
Defers to ``_use_cache`` on the Model class for the key's kind.
Born of the observation that all default policies are more less the
same—they defer to some attribute on the model class for the key's kind and
expects the value to be either of a particular type or a callable.
See: :meth:`~google.cloud.ndb.context.Context.set_cache_policy`
Returns:
Callable[[key], value_type]: A policy function suitable for use as a
default policy.
"""
flag = None
if key is not None:
modelclass = model.Model._kind_map.get(key.kind())
if modelclass is not None:
policy = getattr(modelclass, "_use_cache", None)
if policy is not None:
if isinstance(policy, bool):
flag = policy
else:
flag = policy(key)

return flag
def policy(key):
value = None
if key is not None:
kind = key.kind
if callable(kind):
kind = kind()
modelclass = model.Model._kind_map.get(kind)
if modelclass is not None:
policy = getattr(modelclass, attr_name, None)
if policy is not None:
if isinstance(policy, value_type):
value = policy
else:
value = policy(key)

return value

def _default_global_cache_policy(key):
"""The default global cache policy.
return policy

Defers to ``_use_global_cache`` on the Model class for the key's kind.
See: :meth:`~google.cloud.ndb.context.Context.set_global_cache_policy`
"""
flag = None
if key is not None:
modelclass = model.Model._kind_map.get(key.kind)
if modelclass is not None:
policy = getattr(modelclass, "_use_global_cache", None)
if policy is not None:
if isinstance(policy, bool):
flag = policy
else:
flag = policy(key)

return flag


def _default_global_cache_timeout_policy(key):
"""The default global cache timeout policy.
Defers to ``_global_cache_timeout`` on the Model class for the key's kind.
See:
:meth:`~google.cloud.ndb.context.Context.set_global_cache_timeout_policy`
"""
timeout = None
if key is not None:
modelclass = model.Model._kind_map.get(key.kind)
if modelclass is not None:
policy = getattr(modelclass, "_global_cache_timeout", None)
if policy is not None:
if isinstance(policy, int):
timeout = policy
else:
timeout = policy(key)

return timeout
_default_cache_policy = _default_policy("_use_cache", bool)
"""The default cache policy.
Defers to ``_use_cache`` on the Model class for the key's kind.
See: :meth:`~google.cloud.ndb.context.Context.set_cache_policy`
"""

_default_global_cache_policy = _default_policy("_use_global_cache", bool)
"""The default global cache policy.
Defers to ``_use_global_cache`` on the Model class for the key's kind.
See: :meth:`~google.cloud.ndb.context.Context.set_global_cache_policy`
"""

_default_global_cache_timeout_policy = _default_policy(
"_global_cache_timeout", int
)
"""The default global cache timeout policy.
Defers to ``_global_cache_timeout`` on the Model class for the key's kind.
See: :meth:`~google.cloud.ndb.context.Context.set_global_cache_timeout_policy`
"""

_default_datastore_policy = _default_policy("_use_datastore", bool)
"""The default datastore policy.
Defers to ``_use_datastore`` on the Model class for the key's kind.
See: :meth:`~google.cloud.ndb.context.Context.set_datastore_policy`
"""


_ContextTuple = collections.namedtuple(
Expand Down Expand Up @@ -172,6 +177,7 @@ def __new__(
global_cache=None,
global_cache_policy=None,
global_cache_timeout_policy=None,
datastore_policy=None,
):
if eventloop is None:
eventloop = _eventloop.EventLoop()
Expand Down Expand Up @@ -206,6 +212,7 @@ def __new__(
context.set_cache_policy(cache_policy)
context.set_global_cache_policy(global_cache_policy)
context.set_global_cache_timeout_policy(global_cache_timeout_policy)
context.set_datastore_policy(datastore_policy)

return context

Expand Down Expand Up @@ -283,6 +290,15 @@ def _global_cache_timeout(self, key, options):
timeout = self.global_cache_timeout_policy(key)
return timeout

def _use_datastore(self, key, options=None):
"""Return whether to use the Datastore for this key."""
flag = options.use_datastore if options else None
if flag is None:
flag = self.datastore_policy(key)
if flag is None:
flag = True
return flag


class Context(_Context):
"""User management of cache and other policy."""
Expand Down Expand Up @@ -376,7 +392,16 @@ def set_datastore_policy(self, policy):
positional argument and returns a ``bool`` indicating if it
should use the datastore. May be :data:`None`.
"""
raise NotImplementedError
if policy is None:
policy = _default_datastore_policy

elif isinstance(policy, bool):
flag = policy

def policy(key):
return flag

self.datastore_policy = policy

def set_global_cache_policy(self, policy):
"""Set the memcache policy function.
Expand Down Expand Up @@ -454,20 +479,6 @@ def in_transaction(self):
"""
return self.transaction is not None

@staticmethod
def default_datastore_policy(key):
"""Default cache policy.
This defers to ``Model._use_datastore``.
Args:
key (google.cloud.ndb.key.Key): The key.
Returns:
Union[bool, None]: Whether to use datastore.
"""
raise NotImplementedError

def memcache_add(self, *args, **kwargs):
"""Direct pass-through to memcache client."""
raise exceptions.NoLongerImplementedError()
Expand Down
5 changes: 4 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ def context():
project="testing", namespace=None, spec=("project", "namespace")
)
context = context_module.Context(
client, stub=mock.Mock(spec=()), eventloop=TestingEventLoop()
client,
stub=mock.Mock(spec=()),
eventloop=TestingEventLoop(),
datastore_policy=True,
)
return context

Expand Down
30 changes: 30 additions & 0 deletions tests/system/test_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -798,3 +798,33 @@ class SomeKind(ndb.Model):

with pytest.raises(ndb.exceptions.BadValueError):
entity.put()


@mock.patch(
"google.cloud.ndb._datastore_api.make_call",
mock.Mock(side_effect=Exception("Datastore shouldn't get called.")),
)
def test_crud_without_datastore(ds_entity, client_context):
entity_id = test_utils.system.unique_resource_id()

class SomeKind(ndb.Model):
foo = ndb.IntegerProperty()
bar = ndb.StringProperty()
baz = ndb.StringProperty()

global_cache = global_cache_module._InProcessGlobalCache()
with client_context.new(global_cache=global_cache).use() as context:
context.set_global_cache_policy(None) # Use default
context.set_datastore_policy(False) # Don't use Datastore

key = ndb.Key(KIND, entity_id)
SomeKind(foo=42, bar="none", baz="night", _key=key).put()

entity = key.get()
assert isinstance(entity, SomeKind)
assert entity.foo == 42
assert entity.bar == "none"
assert entity.baz == "night"

key.delete()
assert key.get() is None
Loading

0 comments on commit 9883379

Please sign in to comment.