Skip to content

Commit

Permalink
Feature/add strategies (#17)
Browse files Browse the repository at this point in the history
* First approach on how to implement cache strategies

* Added unit tests for checking calls

* Integrated with default policy for POC

* POC demonstrating LRUCache plus tests

* Added policy support in mget and mset
  • Loading branch information
argaen committed Oct 13, 2016
1 parent df47a00 commit a27f37e
Show file tree
Hide file tree
Showing 22 changed files with 396 additions and 28 deletions.
6 changes: 3 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ Sometimes, you will want to use this decorator with specific backend and seriali
return await aiocache.default_cache.get("key")
@cached(ttl=10, namespace="test")
@cached(ttl=10, namespace="test:")
async def decorator_example():
print("First ASYNC non cached call...")
await asyncio.sleep(1)
Expand Down Expand Up @@ -127,7 +127,7 @@ You can instantiate a cache class and interact with it as follows:
async def main():
cache = RedisCache(endpoint="127.0.0.1", port=6379, namespace="main")
cache = RedisCache(endpoint="127.0.0.1", port=6379, namespace="main:")
await cache.set("key", "value")
await cache.set("expire_me", "value", ttl=10) # Key will expire after 10 secs
print(await cache.get("key"))
Expand All @@ -154,7 +154,7 @@ In some cases, you may want to cache complex objects and depending on the backen
async def main():
cache = RedisCache(serializer=PickleSerializer(), namespace="default")
cache = RedisCache(serializer=PickleSerializer(), namespace="default:")
await cache.set("key", MyObject(x=1, y=2)) # This will serialize to pickle and store in redis with bytes format
my_object = await cache.get("key") # This will retrieve the object and deserialize back to MyObject
print("MyObject x={}, y={}".format(my_object.x, my_object.y))
Expand Down
8 changes: 8 additions & 0 deletions aiocache/backends/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import abc

from aiocache.serializers import DefaultSerializer
from aiocache.policies import DefaultPolicy


class BaseCache(metaclass=abc.ABCMeta):
Expand All @@ -18,12 +19,19 @@ class BaseCache(metaclass=abc.ABCMeta):
def __init__(self, serializer=None, namespace=None, max_keys=None):

self.serializer = serializer or self.get_serializer()
self.policy = self.get_policy()
self.namespace = namespace or ""
self.max_keys = max_keys or None

def get_serializer(self):
return DefaultSerializer()

def get_policy(self):
return DefaultPolicy(self)

def set_policy(self, class_, *args, **kwargs):
self.policy = class_(self, *args, **kwargs)

@abc.abstractmethod
async def add(self, key, value, ttl=None): # pragma: no cover
pass
Expand Down
47 changes: 37 additions & 10 deletions aiocache/backends/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,15 @@ async def get(self, key, default=None, loads_fn=None):
"""

loads = loads_fn or self.serializer.loads
return loads(SimpleMemoryCache._cache.get(self._build_key(key), default))
ns_key = self._build_key(key)

await self.policy.pre_get(key)
value = loads(SimpleMemoryCache._cache.get(ns_key))

if value:
await self.policy.post_get(key)

return value or default

async def multi_get(self, keys, loads_fn=None):
"""
Expand All @@ -33,7 +41,16 @@ async def multi_get(self, keys, loads_fn=None):
:returns: obj loadsd
"""
loads = loads_fn or self.serializer.loads
return [loads(SimpleMemoryCache._cache.get(self._build_key(key))) for key in keys]

for key in keys:
await self.policy.pre_get(key)

values = [loads(SimpleMemoryCache._cache.get(self._build_key(key))) for key in keys]

for key in keys:
await self.policy.post_get(key)

return values

async def set(self, key, value, ttl=None, dumps_fn=None):
"""
Expand All @@ -46,10 +63,15 @@ async def set(self, key, value, ttl=None, dumps_fn=None):
:returns: True
"""
dumps = dumps_fn or self.serializer.dumps
SimpleMemoryCache._cache[self._build_key(key)] = dumps(value)
ns_key = self._build_key(key)

await self.policy.pre_set(key, value)
SimpleMemoryCache._cache[ns_key] = dumps(value)
if ttl:
loop = asyncio.get_event_loop()
loop.call_later(ttl, self._delete, key)
loop.call_later(ttl, self._delete, ns_key)

await self.policy.post_set(key, value)
return True

async def multi_set(self, pairs, dumps_fn=None):
Expand All @@ -63,7 +85,9 @@ async def multi_set(self, pairs, dumps_fn=None):
dumps = dumps_fn or self.serializer.dumps

for key, value in pairs:
await self.policy.pre_set(key, value)
SimpleMemoryCache._cache[self._build_key(key)] = dumps(value)
await self.policy.post_set(key, value)
return True

async def add(self, key, value, ttl=None, dumps_fn=None):
Expand All @@ -79,16 +103,18 @@ async def add(self, key, value, ttl=None, dumps_fn=None):
:raises: Value error if key already exists
"""
dumps = dumps_fn or self.serializer.dumps
ns_key = self._build_key(key)

key = self._build_key(key)
if key in SimpleMemoryCache._cache:
if ns_key in SimpleMemoryCache._cache:
raise ValueError(
"Key {} already exists, use .set to update the value".format(key))
"Key {} already exists, use .set to update the value".format(ns_key))

SimpleMemoryCache._cache[self._build_key(key)] = dumps(value)
await self.policy.pre_set(key, value)
SimpleMemoryCache._cache[ns_key] = dumps(value)
if ttl:
loop = asyncio.get_event_loop()
loop.call_later(ttl, self._delete, key)
loop.call_later(ttl, self._delete, ns_key)
await self.policy.post_set(key, value)
return True

async def exists(self, key):
Expand All @@ -107,7 +133,8 @@ async def delete(self, key):
:param key: Key to be deleted
:returns: int number of deleted keys
"""
key = self._build_key(key)
return self._delete(key)

def _delete(self, key):
return SimpleMemoryCache._cache.pop(self._build_key(key), 0)
return SimpleMemoryCache._cache.pop(key, 0)
56 changes: 45 additions & 11 deletions aiocache/backends/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

class RedisCache(BaseCache):

def __init__(self, endpoint=None, port=None, loop=None, *args, **kwargs):
def __init__(self, *args, endpoint=None, port=None, loop=None, **kwargs):
super().__init__(*args, **kwargs)
self.endpoint = endpoint or "127.0.0.1"
self.port = port or 6379
Expand All @@ -28,10 +28,17 @@ async def get(self, key, default=None, loads_fn=None, encoding=None):

loads = loads_fn or self.serializer.loads
encoding = encoding or getattr(self.serializer, "encoding", 'utf-8')
ns_key = self._build_key(key)

await self.policy.pre_get(key)

with await self._connect() as redis:
return loads(
await redis.get(self._build_key(key), encoding=encoding)) or default
value = loads(await redis.get(ns_key, encoding=encoding))

if value:
await self.policy.post_get(key)

return value or default

async def multi_get(self, keys, loads_fn=None, encoding=None):
"""
Expand All @@ -45,9 +52,17 @@ async def multi_get(self, keys, loads_fn=None, encoding=None):
loads = loads_fn or self.serializer.loads
encoding = encoding or getattr(self.serializer, "encoding", 'utf-8')

for key in keys:
await self.policy.pre_get(key)

with await self._connect() as redis:
keys = [self._build_key(key) for key in keys]
return [loads(obj) for obj in (await redis.mget(*keys, encoding=encoding))]
ns_keys = [self._build_key(key) for key in keys]
values = [loads(obj) for obj in (await redis.mget(*ns_keys, encoding=encoding))]

for key in keys:
await self.policy.post_get(key)

return values

async def set(self, key, value, ttl=None, dumps_fn=None):
"""
Expand All @@ -61,9 +76,15 @@ async def set(self, key, value, ttl=None, dumps_fn=None):
"""
dumps = dumps_fn or self.serializer.dumps
ttl = ttl or 0
ns_key = self._build_key(key)

await self.policy.pre_set(key, value)

with await self._connect() as redis:
return await redis.set(self._build_key(key), dumps(value), expire=ttl)
ret = await redis.set(ns_key, dumps(value), expire=ttl)

await self.policy.post_set(key, value)
return ret

async def multi_set(self, pairs, dumps_fn=None):
"""
Expand All @@ -75,11 +96,19 @@ async def multi_set(self, pairs, dumps_fn=None):
"""
dumps = dumps_fn or self.serializer.dumps

for key, value in pairs:
await self.policy.pre_set(key, value)

with await self._connect() as redis:
serialized_pairs = list(
chain.from_iterable(
(self._build_key(key), dumps(value)) for key, value in pairs))
return await redis.mset(*serialized_pairs)
ret = await redis.mset(*serialized_pairs)

for key, value in pairs:
await self.policy.post_set(key, value)

return ret

async def add(self, key, value, ttl=None, dumps_fn=None):
"""
Expand All @@ -95,13 +124,18 @@ async def add(self, key, value, ttl=None, dumps_fn=None):
"""
dumps = dumps_fn or self.serializer.dumps
ttl = ttl or 0
ns_key = self._build_key(key)

await self.policy.pre_set(key, value)

key = self._build_key(key)
with await self._connect() as redis:
if await redis.exists(key):
if await redis.exists(ns_key):
raise ValueError(
"Key {} already exists, use .set to update the value".format(key))
return await redis.set(key, dumps(value), expire=ttl)
"Key {} already exists, use .set to update the value".format(ns_key))
ret = await redis.set(ns_key, dumps(value), expire=ttl)

await self.policy.post_set(key, value)
return ret

async def exists(self, key):
"""
Expand Down
66 changes: 66 additions & 0 deletions aiocache/policies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import logging

from collections import deque


logger = logging.getLogger(__name__)


class DefaultPolicy:
"""
Default and base policy. It's the default used by all backends and it does nothing.
:param client: Backend class to interact with the storage.
"""

def __init__(self, client):
self.client = client

async def pre_get(self, key):
pass

async def post_get(self, key):
pass

async def pre_set(self, key, value):
pass

async def post_set(self, key, value):
pass


class LRUPolicy(DefaultPolicy):
"""
Implements a Least Recently Used policy with max_keys. The policy does the following:
- When a key is retrieved (get or mget), keys are moved to the beginning of the queue
- When a key is added (set, mset or add), keys are added to the beginning of the queue. If
the queue is full, it will remove as many keys as needed to make space for the new
ones.
"""
def __init__(self, *args, max_keys=None, **kwargs):
super().__init__(*args, **kwargs)
if max_keys is not None:
assert max_keys >= 1, "Number of keys must be 1 or bigger"
self.dq = deque(maxlen=max_keys)

async def post_get(self, key):
"""
Remove the key from its current position and set it at the beginning of the queue.
:param key: string key used in the get operation
"""
self.dq.remove(key)
self.dq.appendleft(key)

async def post_set(self, key, value):
"""
Set the given key at the beginning of the queue. If the queue is full, remove the last
item first.
:param key: string key used in the set operation
:param value: obj used in the set operation
"""
if len(self.dq) == self.dq.maxlen:
await self.client.delete(self.dq.pop())
self.dq.appendleft(key)
4 changes: 2 additions & 2 deletions aiocache/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
logger = logging.getLogger(__name__)


def cached(ttl=0, backend=None, serializer=None, *args, **kwargs):
def cached(*args, ttl=0, backend=None, serializer=None, **kwargs):
"""
Caches the functions return value into a key generated with module_name, function_name and args.
Expand Down Expand Up @@ -37,7 +37,7 @@ async def wrapper(*args, **kwargs):
return cached_decorator


def get_default_cache(backend=None, serializer=None, *args, **kwargs):
def get_default_cache(*args, backend=None, serializer=None, **kwargs):
serializer = serializer if serializer else DefaultSerializer()
if backend:
return backend(serializer=serializer, *args, **kwargs)
Expand Down
2 changes: 1 addition & 1 deletion docs/backends.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Backends
========

You can use different backends according to your needs. All the backends implement the same interface which includes the methods: ``get``, ``set``, ``multi_get``, ``multi_set``, ``delete``. If you feel a method is really missing here do not hesitate to open an issue in github.
You can use different backends according to your needs. All the backends implement the same interface which includes the methods: ``add``, ``get``, ``set``, ``multi_get``, ``multi_set``, ``delete``, ``exists``. If you feel a method is really missing here do not hesitate to open an issue in github.

Backends are always working through a serializer. The serializer allows to transform the data when storing and retrieving the data from the storage. This for example, allows to store python classes in Redis which by default, it only supports storing strings, int, bytes. As you may have guessed, this has a con: in some cases the data won't be raw accessible in the storage as the serializer may apply some weird transformations on it before storing it. To give an idea, the set operation on any backend works as follows:

Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Contents

backends
serializers
policies
decorators

Indices and tables
Expand Down
27 changes: 27 additions & 0 deletions docs/policies.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
Policies
========

Policies can be used to change the behavior of the cache. By default any backend uses :class:`aiocache.policies.DefaultPolicy` which does nothing. You can select the policy by calling ``cache.set_policy(MyPolicy)``.


DefaultPolicy
-------------

.. autoclass:: aiocache.policies.DefaultPolicy
:members:
:undoc-members:


LRUPolicy
---------

.. autoclass:: aiocache.policies.LRUPolicy
:members:
:undoc-members:


An example usage of the policy:

.. literalinclude:: ../examples/policy.py
:language: python
:linenos:

0 comments on commit a27f37e

Please sign in to comment.