From e796d91a1c573f7efee551f849cd129798493c88 Mon Sep 17 00:00:00 2001 From: Chris Rossi Date: Tue, 8 Sep 2020 15:35:13 -0400 Subject: [PATCH] feat: memcached integration Adds a new `GlobalCache` implementation, `MemcacheCache`, which allows memcached to be used as a global cache. May be used with a Google Memorystore, or any configured memcached instance. --- .kokoro/build.sh | 6 +- .kokoro/docker/docs/Dockerfile | 1 + docs/conf.py | 1 + google/cloud/ndb/__init__.py | 2 + google/cloud/ndb/global_cache.py | 120 +++++++++++++++++++ setup.py | 1 + tests/system/conftest.py | 8 ++ tests/system/test_crud.py | 83 ++++++++++++++ tests/unit/test_global_cache.py | 190 +++++++++++++++++++++++++++++++ 9 files changed, 411 insertions(+), 1 deletion(-) diff --git a/.kokoro/build.sh b/.kokoro/build.sh index 8ef9ba5f..08c70053 100755 --- a/.kokoro/build.sh +++ b/.kokoro/build.sh @@ -34,10 +34,14 @@ export GOOGLE_APPLICATION_CREDENTIALS=${KOKORO_GFILE_DIR}/service-account.json # Setup project id. export PROJECT_ID=$(cat "${KOKORO_GFILE_DIR}/project-id.json") -# Configure Local Redis to be used +# Configure local Redis to be used export REDIS_CACHE_URL=redis://localhost redis-server & +# Configure local memcached to be used +export MEMCACHED_HOSTS=localhost +service memcached start + # Some system tests require indexes. Use gcloud to create them. gcloud auth activate-service-account --key-file=$GOOGLE_APPLICATION_CREDENTIALS --project=$PROJECT_ID gcloud --quiet --verbosity=debug datastore indexes create tests/system/index.yaml diff --git a/.kokoro/docker/docs/Dockerfile b/.kokoro/docker/docs/Dockerfile index 412b0b56..8f8e81cf 100644 --- a/.kokoro/docker/docs/Dockerfile +++ b/.kokoro/docker/docs/Dockerfile @@ -39,6 +39,7 @@ RUN apt-get update \ libsnappy-dev \ libssl-dev \ libsqlite3-dev \ + memcached \ portaudio19-dev \ redis-server \ software-properties-common \ diff --git a/docs/conf.py b/docs/conf.py index c8f109a1..12d39b88 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -53,6 +53,7 @@ ("py:class", "Tuple"), ("py:class", "Union"), ("py:class", "redis.Redis"), + ("py:class", "pymemcache.Client"), ] # Add any Sphinx extension module names here, as strings. They can be diff --git a/google/cloud/ndb/__init__.py b/google/cloud/ndb/__init__.py index a1c4bce8..c7475006 100644 --- a/google/cloud/ndb/__init__.py +++ b/google/cloud/ndb/__init__.py @@ -38,6 +38,7 @@ from google.cloud.ndb._datastore_query import Cursor from google.cloud.ndb._datastore_query import QueryIterator from google.cloud.ndb.global_cache import GlobalCache +from google.cloud.ndb.global_cache import MemcacheCache from google.cloud.ndb.global_cache import RedisCache from google.cloud.ndb.key import Key from google.cloud.ndb.model import BlobKey @@ -171,6 +172,7 @@ "KindError", "LocalStructuredProperty", "make_connection", + "MemcacheCache", "MetaModel", "Model", "ModelAdapter", diff --git a/google/cloud/ndb/global_cache.py b/google/cloud/ndb/global_cache.py index a46ed626..ddd9458a 100644 --- a/google/cloud/ndb/global_cache.py +++ b/google/cloud/ndb/global_cache.py @@ -15,12 +15,14 @@ """GlobalCache interface and its implementations.""" import abc +import base64 import collections import os import threading import time import uuid +import pymemcache import redis as redis_module @@ -282,3 +284,121 @@ def compare_and_swap(self, items, expires=None): self.pipes.pop(key, None) return results + + +class MemcacheCache(GlobalCache): + """Memcache implementation of the :class:`GlobalCache`. + + This is a synchronous implementation. The idea is that calls to Memcache + should be fast enough not to warrant the added complexity of an + asynchronous implementation. + + Args: + client (pymemcache.Client): Instance of Memcache client to use. + """ + + @staticmethod + def _parse_host_string(host_string): + split = host_string.split(":") + if len(split) == 1: + return split[0], 11211 + + elif len(split) == 2: + host, port = split + try: + port = int(port) + return host, port + except ValueError: + pass + + raise ValueError("Invalid memcached host_string: {}".format(host_string)) + + @staticmethod + def _key(key): + return base64.b64encode(key) + + @classmethod + def from_environment(cls, max_pool_size=4): + """Generate a ``pymemcache.Client`` from an environment variable. + + This class method looks for the ``MEMCACHED_HOSTS`` environment + variable and, if it is set, parses the value as a space delimited list of + hostnames, optionally with ports. For example: + + "localhost" + "localhost:11211" + "1.1.1.1:11211 2.2.2.2:11211 3.3.3.3:11211" + + Returns: + Optional[MemcacheCache]: A :class:`MemcacheCache` instance or + :data:`None`, if ``MEMCACHED_HOSTS`` is not set in the + environment. + """ + hosts_string = os.environ.get("MEMCACHED_HOSTS") + if not hosts_string: + return None + + hosts = [ + cls._parse_host_string(host_string.strip()) + for host_string in hosts_string.split() + ] + + if not max_pool_size: + max_pool_size = 1 + + if len(hosts) == 1: + client = pymemcache.PooledClient(hosts[0], max_pool_size=max_pool_size) + + else: + client = pymemcache.HashClient( + hosts, use_pooling=True, max_pool_size=max_pool_size + ) + + return cls(client) + + def __init__(self, client): + self.client = client + self._cas = threading.local() + + @property + def caskeys(self): + local = self._cas + if not hasattr(local, "caskeys"): + local.caskeys = {} + return local.caskeys + + def get(self, keys): + """Implements :meth:`GlobalCache.get`.""" + keys = [self._key(key) for key in keys] + result = self.client.get_many(keys) + return [result.get(key) for key in keys] + + def set(self, items, expires=None): + """Implements :meth:`GlobalCache.set`.""" + items = {self._key(key): value for key, value in items.items()} + expires = expires if expires else 0 + self.client.set_many(items, expire=expires) + + def delete(self, keys): + """Implements :meth:`GlobalCache.delete`.""" + keys = [self._key(key) for key in keys] + self.client.delete_many(keys) + + def watch(self, keys): + """Implements :meth:`GlobalCache.watch`.""" + keys = [self._key(key) for key in keys] + caskeys = self.caskeys + for key, (value, caskey) in self.client.gets_many(keys).items(): + caskeys[key] = caskey + + def compare_and_swap(self, items, expires=None): + """Implements :meth:`GlobalCache.compare_and_swap`.""" + caskeys = self.caskeys + for key, value in items.items(): + key = self._key(key) + caskey = caskeys.pop(key, None) + if caskey is None: + continue + + expires = expires if expires else 0 + self.client.cas(key, value, caskey, expire=expires) diff --git a/setup.py b/setup.py index 6be67236..eb195b7c 100644 --- a/setup.py +++ b/setup.py @@ -26,6 +26,7 @@ def main(): readme = readme_file.read() dependencies = [ "google-cloud-datastore >= 1.7.0", + "pymemcache", "redis", ] diff --git a/tests/system/conftest.py b/tests/system/conftest.py index da44bbed..1878a7b5 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -149,3 +149,11 @@ def redis_context(client_context): with client_context.new(global_cache=global_cache).use() as context: context.set_global_cache_policy(None) # Use default yield context + + +@pytest.fixture +def memcache_context(client_context): + global_cache = global_cache_module.MemcacheCache.from_environment() + with client_context.new(global_cache=global_cache).use() as context: + context.set_global_cache_policy(None) # Use default + yield context diff --git a/tests/system/test_crud.py b/tests/system/test_crud.py index 67a174c0..2d4fa3d7 100644 --- a/tests/system/test_crud.py +++ b/tests/system/test_crud.py @@ -39,6 +39,7 @@ from . import KIND, eventually, equals USE_REDIS_CACHE = bool(os.environ.get("REDIS_CACHE_URL")) +USE_MEMCACHE = bool(os.environ.get("MEMCACHED_HOSTS")) def _assert_contemporaneous(timestamp1, timestamp2, delta_margin=2): @@ -149,6 +150,37 @@ class SomeKind(ndb.Model): assert entity.baz == "night" +@pytest.mark.skipif(not USE_MEMCACHE, reason="Memcache is not configured") +def test_retrieve_entity_with_memcache(ds_entity, memcache_context): + entity_id = test_utils.system.unique_resource_id() + ds_entity(KIND, entity_id, foo=42, bar="none", baz=b"night") + + class SomeKind(ndb.Model): + foo = ndb.IntegerProperty() + bar = ndb.StringProperty() + baz = ndb.StringProperty() + + key = ndb.Key(KIND, entity_id) + entity = key.get() + assert isinstance(entity, SomeKind) + assert entity.foo == 42 + assert entity.bar == "none" + assert entity.baz == "night" + + cache_key = _cache.global_cache_key(key._key) + cache_key = global_cache_module.MemcacheCache._key(cache_key) + assert memcache_context.global_cache.client.get(cache_key) is not None + + patch = mock.patch("google.cloud.ndb._datastore_api._LookupBatch.add") + patch.side_effect = Exception("Shouldn't call this") + with patch: + entity = key.get() + assert isinstance(entity, SomeKind) + assert entity.foo == 42 + assert entity.bar == "none" + assert entity.baz == "night" + + @pytest.mark.usefixtures("client_context") def test_retrieve_entity_not_found(ds_entity): entity_id = test_utils.system.unique_resource_id() @@ -586,6 +618,33 @@ class SomeKind(ndb.Model): assert redis_context.global_cache.redis.get(cache_key) is None +@pytest.mark.skipif(not USE_MEMCACHE, reason="Memcache is not configured") +def test_insert_entity_with_memcache(dispose_of, memcache_context): + class SomeKind(ndb.Model): + foo = ndb.IntegerProperty() + bar = ndb.StringProperty() + + entity = SomeKind(foo=42, bar="none") + key = entity.put() + dispose_of(key._key) + cache_key = _cache.global_cache_key(key._key) + cache_key = global_cache_module.MemcacheCache._key(cache_key) + assert memcache_context.global_cache.client.get(cache_key) is None + + retrieved = key.get() + assert retrieved.foo == 42 + assert retrieved.bar == "none" + + assert memcache_context.global_cache.client.get(cache_key) is not None + + entity.foo = 43 + entity.put() + + # This is py27 behavior. I can see a case being made for caching the + # entity on write rather than waiting for a subsequent lookup. + assert memcache_context.global_cache.client.get(cache_key) is None + + @pytest.mark.usefixtures("client_context") def test_update_entity(ds_entity): entity_id = test_utils.system.unique_resource_id() @@ -750,6 +809,30 @@ class SomeKind(ndb.Model): assert redis_context.global_cache.redis.get(cache_key) == b"0" +@pytest.mark.skipif(not USE_MEMCACHE, reason="Memcache is not configured") +def test_delete_entity_with_memcache(ds_entity, memcache_context): + entity_id = test_utils.system.unique_resource_id() + ds_entity(KIND, entity_id, foo=42) + + class SomeKind(ndb.Model): + foo = ndb.IntegerProperty() + + key = ndb.Key(KIND, entity_id) + cache_key = _cache.global_cache_key(key._key) + cache_key = global_cache_module.MemcacheCache._key(cache_key) + + assert key.get().foo == 42 + assert memcache_context.global_cache.client.get(cache_key) is not None + + assert key.delete() is None + assert memcache_context.global_cache.client.get(cache_key) is None + + # This is py27 behavior. Not entirely sold on leaving _LOCKED value for + # Datastore misses. + assert key.get() is None + assert memcache_context.global_cache.client.get(cache_key) == b"0" + + @pytest.mark.usefixtures("client_context") def test_delete_entity_in_transaction(ds_entity): entity_id = test_utils.system.unique_resource_id() diff --git a/tests/unit/test_global_cache.py b/tests/unit/test_global_cache.py index 06823204..c1de46b6 100644 --- a/tests/unit/test_global_cache.py +++ b/tests/unit/test_global_cache.py @@ -294,3 +294,193 @@ def mock_expire(key, expires): assert cache.pipes == {"whatevs": global_cache._Pipeline(None, "himom!")} assert expired == {"ay": 32, "be": 32, "see": 32} + + +class TestMemcacheCache: + @staticmethod + @mock.patch("google.cloud.ndb.global_cache.pymemcache") + def test_from_environment_not_configured(pymemcache): + with mock.patch.dict("os.environ", {"MEMCACHED_HOSTS": None}): + assert global_cache.MemcacheCache.from_environment() is None + + @staticmethod + @mock.patch("google.cloud.ndb.global_cache.pymemcache") + def test_from_environment_one_host_no_port(pymemcache): + with mock.patch.dict("os.environ", {"MEMCACHED_HOSTS": "somehost"}): + cache = global_cache.MemcacheCache.from_environment() + assert cache.client is pymemcache.PooledClient.return_value + pymemcache.PooledClient.assert_called_once_with( + ("somehost", 11211), max_pool_size=4 + ) + + @staticmethod + @mock.patch("google.cloud.ndb.global_cache.pymemcache") + def test_from_environment_one_host_with_port(pymemcache): + with mock.patch.dict("os.environ", {"MEMCACHED_HOSTS": "somehost:22422"}): + cache = global_cache.MemcacheCache.from_environment() + assert cache.client is pymemcache.PooledClient.return_value + pymemcache.PooledClient.assert_called_once_with( + ("somehost", 22422), max_pool_size=4 + ) + + @staticmethod + @mock.patch("google.cloud.ndb.global_cache.pymemcache") + def test_from_environment_two_hosts_with_port(pymemcache): + with mock.patch.dict( + "os.environ", {"MEMCACHED_HOSTS": "somehost:22422 otherhost:33633"} + ): + cache = global_cache.MemcacheCache.from_environment() + assert cache.client is pymemcache.HashClient.return_value + pymemcache.HashClient.assert_called_once_with( + [("somehost", 22422), ("otherhost", 33633)], + use_pooling=True, + max_pool_size=4, + ) + + @staticmethod + @mock.patch("google.cloud.ndb.global_cache.pymemcache") + def test_from_environment_two_hosts_no_port(pymemcache): + with mock.patch.dict("os.environ", {"MEMCACHED_HOSTS": "somehost otherhost"}): + cache = global_cache.MemcacheCache.from_environment() + assert cache.client is pymemcache.HashClient.return_value + pymemcache.HashClient.assert_called_once_with( + [("somehost", 11211), ("otherhost", 11211)], + use_pooling=True, + max_pool_size=4, + ) + + @staticmethod + @mock.patch("google.cloud.ndb.global_cache.pymemcache") + def test_from_environment_one_host_no_port_pool_size_zero(pymemcache): + with mock.patch.dict("os.environ", {"MEMCACHED_HOSTS": "somehost"}): + cache = global_cache.MemcacheCache.from_environment(max_pool_size=0) + assert cache.client is pymemcache.PooledClient.return_value + pymemcache.PooledClient.assert_called_once_with( + ("somehost", 11211), max_pool_size=1 + ) + + @staticmethod + @mock.patch("google.cloud.ndb.global_cache.pymemcache") + def test_from_environment_bad_host_extra_colon(pymemcache): + with mock.patch.dict("os.environ", {"MEMCACHED_HOSTS": "somehost:say:what?"}): + with pytest.raises(ValueError): + global_cache.MemcacheCache.from_environment() + + @staticmethod + @mock.patch("google.cloud.ndb.global_cache.pymemcache") + def test_from_environment_bad_host_port_not_an_integer(pymemcache): + with mock.patch.dict("os.environ", {"MEMCACHED_HOSTS": "somehost:saywhat?"}): + with pytest.raises(ValueError): + global_cache.MemcacheCache.from_environment() + + @staticmethod + def test_get(): + client = mock.Mock(spec=("get_many",)) + cache = global_cache.MemcacheCache(client) + key1 = cache._key(b"one") + key2 = cache._key(b"two") + client.get_many.return_value = {key1: "bun", key2: "shoe"} + assert cache.get((b"one", b"two")) == ["bun", "shoe"] + client.get_many.assert_called_once_with([key1, key2]) + + @staticmethod + def test_set(): + client = mock.Mock(spec=("set_many",)) + cache = global_cache.MemcacheCache(client) + key1 = cache._key(b"one") + key2 = cache._key(b"two") + cache.set( + { + b"one": "bun", + b"two": "shoe", + } + ) + client.set_many.assert_called_once_with( + { + key1: "bun", + key2: "shoe", + }, + expire=0, + ) + + @staticmethod + def test_set_w_expires(): + client = mock.Mock(spec=("set_many",)) + cache = global_cache.MemcacheCache(client) + key1 = cache._key(b"one") + key2 = cache._key(b"two") + cache.set( + { + b"one": "bun", + b"two": "shoe", + }, + expires=5, + ) + client.set_many.assert_called_once_with( + { + key1: "bun", + key2: "shoe", + }, + expire=5, + ) + + @staticmethod + def test_delete(): + client = mock.Mock(spec=("delete_many",)) + cache = global_cache.MemcacheCache(client) + key1 = cache._key(b"one") + key2 = cache._key(b"two") + cache.delete((b"one", b"two")) + client.delete_many.assert_called_once_with([key1, key2]) + + @staticmethod + def test_watch(): + client = mock.Mock(spec=("gets_many",)) + cache = global_cache.MemcacheCache(client) + key1 = cache._key(b"one") + key2 = cache._key(b"two") + client.gets_many.return_value = { + key1: ("bun", b"0"), + key2: ("shoe", b"1"), + } + cache.watch((b"one", b"two")) + client.gets_many.assert_called_once_with([key1, key2]) + assert cache.caskeys == { + key1: b"0", + key2: b"1", + } + + @staticmethod + def test_compare_and_swap(): + client = mock.Mock(spec=("cas",)) + cache = global_cache.MemcacheCache(client) + key2 = cache._key(b"two") + cache.caskeys[key2] = b"5" + cache.caskeys["whatevs"] = b"6" + cache.compare_and_swap( + { + b"one": "bun", + b"two": "shoe", + } + ) + + client.cas.assert_called_once_with(key2, "shoe", b"5", expire=0) + assert cache.caskeys == {"whatevs": b"6"} + + @staticmethod + def test_compare_and_swap_and_expires(): + client = mock.Mock(spec=("cas",)) + cache = global_cache.MemcacheCache(client) + key2 = cache._key(b"two") + cache.caskeys[key2] = b"5" + cache.caskeys["whatevs"] = b"6" + cache.compare_and_swap( + { + b"one": "bun", + b"two": "shoe", + }, + expires=5, + ) + + client.cas.assert_called_once_with(key2, "shoe", b"5", expire=5) + assert cache.caskeys == {"whatevs": b"6"}