Skip to content
This repository was archived by the owner on May 6, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .kokoro/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,14 @@ export GOOGLE_APPLICATION_CREDENTIALS=${KOKORO_GFILE_DIR}/service-account.json
# Setup project id.
export PROJECT_ID=$(cat "${KOKORO_GFILE_DIR}/project-id.json")

# Configure Local Redis to be used
# Configure local Redis to be used
export REDIS_CACHE_URL=redis://localhost
redis-server &

# Configure local memcached to be used
export MEMCACHED_HOSTS=localhost
service memcached start

# Some system tests require indexes. Use gcloud to create them.
gcloud auth activate-service-account --key-file=$GOOGLE_APPLICATION_CREDENTIALS --project=$PROJECT_ID
gcloud --quiet --verbosity=debug datastore indexes create tests/system/index.yaml
Expand Down
1 change: 1 addition & 0 deletions .kokoro/docker/docs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ RUN apt-get update \
libsnappy-dev \
libssl-dev \
libsqlite3-dev \
memcached \
portaudio19-dev \
redis-server \
software-properties-common \
Expand Down
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
("py:class", "Tuple"),
("py:class", "Union"),
("py:class", "redis.Redis"),
("py:class", "pymemcache.Client"),
]

# Add any Sphinx extension module names here, as strings. They can be
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/ndb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from google.cloud.ndb._datastore_query import Cursor
from google.cloud.ndb._datastore_query import QueryIterator
from google.cloud.ndb.global_cache import GlobalCache
from google.cloud.ndb.global_cache import MemcacheCache
from google.cloud.ndb.global_cache import RedisCache
from google.cloud.ndb.key import Key
from google.cloud.ndb.model import BlobKey
Expand Down Expand Up @@ -171,6 +172,7 @@
"KindError",
"LocalStructuredProperty",
"make_connection",
"MemcacheCache",
"MetaModel",
"Model",
"ModelAdapter",
Expand Down
120 changes: 120 additions & 0 deletions google/cloud/ndb/global_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
"""GlobalCache interface and its implementations."""

import abc
import base64
import collections
import os
import threading
import time
import uuid

import pymemcache
import redis as redis_module


Expand Down Expand Up @@ -282,3 +284,121 @@ def compare_and_swap(self, items, expires=None):
self.pipes.pop(key, None)

return results


class MemcacheCache(GlobalCache):
"""Memcache implementation of the :class:`GlobalCache`.

This is a synchronous implementation. The idea is that calls to Memcache
should be fast enough not to warrant the added complexity of an
asynchronous implementation.

Args:
client (pymemcache.Client): Instance of Memcache client to use.
"""

@staticmethod
def _parse_host_string(host_string):
split = host_string.split(":")
if len(split) == 1:
return split[0], 11211

elif len(split) == 2:
host, port = split
try:
port = int(port)
return host, port
except ValueError:
pass

raise ValueError("Invalid memcached host_string: {}".format(host_string))

@staticmethod
def _key(key):
return base64.b64encode(key)

@classmethod
def from_environment(cls, max_pool_size=4):
"""Generate a ``pymemcache.Client`` from an environment variable.

This class method looks for the ``MEMCACHED_HOSTS`` environment
variable and, if it is set, parses the value as a space delimited list of
hostnames, optionally with ports. For example:

"localhost"
"localhost:11211"
"1.1.1.1:11211 2.2.2.2:11211 3.3.3.3:11211"

Returns:
Optional[MemcacheCache]: A :class:`MemcacheCache` instance or
:data:`None`, if ``MEMCACHED_HOSTS`` is not set in the
environment.
"""
hosts_string = os.environ.get("MEMCACHED_HOSTS")
if not hosts_string:
return None

hosts = [
cls._parse_host_string(host_string.strip())
for host_string in hosts_string.split()
]

if not max_pool_size:
max_pool_size = 1

if len(hosts) == 1:
client = pymemcache.PooledClient(hosts[0], max_pool_size=max_pool_size)

else:
client = pymemcache.HashClient(
hosts, use_pooling=True, max_pool_size=max_pool_size
)

return cls(client)

def __init__(self, client):
self.client = client
self._cas = threading.local()

@property
def caskeys(self):
local = self._cas
if not hasattr(local, "caskeys"):
local.caskeys = {}
return local.caskeys

def get(self, keys):
"""Implements :meth:`GlobalCache.get`."""
keys = [self._key(key) for key in keys]
result = self.client.get_many(keys)
return [result.get(key) for key in keys]

def set(self, items, expires=None):
"""Implements :meth:`GlobalCache.set`."""
items = {self._key(key): value for key, value in items.items()}
expires = expires if expires else 0
self.client.set_many(items, expire=expires)

def delete(self, keys):
"""Implements :meth:`GlobalCache.delete`."""
keys = [self._key(key) for key in keys]
self.client.delete_many(keys)

def watch(self, keys):
"""Implements :meth:`GlobalCache.watch`."""
keys = [self._key(key) for key in keys]
caskeys = self.caskeys
for key, (value, caskey) in self.client.gets_many(keys).items():
caskeys[key] = caskey

def compare_and_swap(self, items, expires=None):
"""Implements :meth:`GlobalCache.compare_and_swap`."""
caskeys = self.caskeys
for key, value in items.items():
key = self._key(key)
caskey = caskeys.pop(key, None)
if caskey is None:
continue

expires = expires if expires else 0
self.client.cas(key, value, caskey, expire=expires)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def main():
readme = readme_file.read()
dependencies = [
"google-cloud-datastore >= 1.7.0",
"pymemcache",
"redis",
]

Expand Down
8 changes: 8 additions & 0 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,11 @@ def redis_context(client_context):
with client_context.new(global_cache=global_cache).use() as context:
context.set_global_cache_policy(None) # Use default
yield context


@pytest.fixture
def memcache_context(client_context):
global_cache = global_cache_module.MemcacheCache.from_environment()
with client_context.new(global_cache=global_cache).use() as context:
context.set_global_cache_policy(None) # Use default
yield context
83 changes: 83 additions & 0 deletions tests/system/test_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from . import KIND, eventually, equals

USE_REDIS_CACHE = bool(os.environ.get("REDIS_CACHE_URL"))
USE_MEMCACHE = bool(os.environ.get("MEMCACHED_HOSTS"))


def _assert_contemporaneous(timestamp1, timestamp2, delta_margin=2):
Expand Down Expand Up @@ -149,6 +150,37 @@ class SomeKind(ndb.Model):
assert entity.baz == "night"


@pytest.mark.skipif(not USE_MEMCACHE, reason="Memcache is not configured")
def test_retrieve_entity_with_memcache(ds_entity, memcache_context):
entity_id = test_utils.system.unique_resource_id()
ds_entity(KIND, entity_id, foo=42, bar="none", baz=b"night")

class SomeKind(ndb.Model):
foo = ndb.IntegerProperty()
bar = ndb.StringProperty()
baz = ndb.StringProperty()

key = ndb.Key(KIND, entity_id)
entity = key.get()
assert isinstance(entity, SomeKind)
assert entity.foo == 42
assert entity.bar == "none"
assert entity.baz == "night"

cache_key = _cache.global_cache_key(key._key)
cache_key = global_cache_module.MemcacheCache._key(cache_key)
assert memcache_context.global_cache.client.get(cache_key) is not None

patch = mock.patch("google.cloud.ndb._datastore_api._LookupBatch.add")
patch.side_effect = Exception("Shouldn't call this")
with patch:
entity = key.get()
assert isinstance(entity, SomeKind)
assert entity.foo == 42
assert entity.bar == "none"
assert entity.baz == "night"


@pytest.mark.usefixtures("client_context")
def test_retrieve_entity_not_found(ds_entity):
entity_id = test_utils.system.unique_resource_id()
Expand Down Expand Up @@ -586,6 +618,33 @@ class SomeKind(ndb.Model):
assert redis_context.global_cache.redis.get(cache_key) is None


@pytest.mark.skipif(not USE_MEMCACHE, reason="Memcache is not configured")
def test_insert_entity_with_memcache(dispose_of, memcache_context):
class SomeKind(ndb.Model):
foo = ndb.IntegerProperty()
bar = ndb.StringProperty()

entity = SomeKind(foo=42, bar="none")
key = entity.put()
dispose_of(key._key)
cache_key = _cache.global_cache_key(key._key)
cache_key = global_cache_module.MemcacheCache._key(cache_key)
assert memcache_context.global_cache.client.get(cache_key) is None

retrieved = key.get()
assert retrieved.foo == 42
assert retrieved.bar == "none"

assert memcache_context.global_cache.client.get(cache_key) is not None

entity.foo = 43
entity.put()

# This is py27 behavior. I can see a case being made for caching the
# entity on write rather than waiting for a subsequent lookup.
assert memcache_context.global_cache.client.get(cache_key) is None


@pytest.mark.usefixtures("client_context")
def test_update_entity(ds_entity):
entity_id = test_utils.system.unique_resource_id()
Expand Down Expand Up @@ -750,6 +809,30 @@ class SomeKind(ndb.Model):
assert redis_context.global_cache.redis.get(cache_key) == b"0"


@pytest.mark.skipif(not USE_MEMCACHE, reason="Memcache is not configured")
def test_delete_entity_with_memcache(ds_entity, memcache_context):
entity_id = test_utils.system.unique_resource_id()
ds_entity(KIND, entity_id, foo=42)

class SomeKind(ndb.Model):
foo = ndb.IntegerProperty()

key = ndb.Key(KIND, entity_id)
cache_key = _cache.global_cache_key(key._key)
cache_key = global_cache_module.MemcacheCache._key(cache_key)

assert key.get().foo == 42
assert memcache_context.global_cache.client.get(cache_key) is not None

assert key.delete() is None
assert memcache_context.global_cache.client.get(cache_key) is None

# This is py27 behavior. Not entirely sold on leaving _LOCKED value for
# Datastore misses.
assert key.get() is None
assert memcache_context.global_cache.client.get(cache_key) == b"0"


@pytest.mark.usefixtures("client_context")
def test_delete_entity_in_transaction(ds_entity):
entity_id = test_utils.system.unique_resource_id()
Expand Down
Loading