/
cache.py
159 lines (119 loc) · 4.57 KB
/
cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""Memcached and in-memory cache result backend."""
from kombu.utils.encoding import bytes_to_str, ensure_bytes
from kombu.utils.objects import cached_property
from celery.exceptions import ImproperlyConfigured
from celery.utils.functional import LRUCache
from .base import KeyValueStoreBackend
__all__ = ('CacheBackend',)
_imp = [None]
REQUIRES_BACKEND = """\
The Memcached backend requires either pylibmc or python-memcached.\
"""
UNKNOWN_BACKEND = """\
The cache backend {0!r} is unknown,
Please use one of the following backends instead: {1}\
"""
def import_best_memcache():
if _imp[0] is None:
is_pylibmc, memcache_key_t = False, bytes_to_str
try:
import pylibmc as memcache
is_pylibmc = True
except ImportError:
try:
import memcache # noqa
except ImportError:
raise ImproperlyConfigured(REQUIRES_BACKEND)
_imp[0] = (is_pylibmc, memcache, memcache_key_t)
return _imp[0]
def get_best_memcache(*args, **kwargs):
# pylint: disable=unpacking-non-sequence
# This is most definitely a sequence, but pylint thinks it's not.
is_pylibmc, memcache, key_t = import_best_memcache()
Client = _Client = memcache.Client
if not is_pylibmc:
def Client(*args, **kwargs): # noqa
kwargs.pop('behaviors', None)
return _Client(*args, **kwargs)
return Client, key_t
class DummyClient:
def __init__(self, *args, **kwargs):
self.cache = LRUCache(limit=5000)
def get(self, key, *args, **kwargs):
return self.cache.get(key)
def get_multi(self, keys):
cache = self.cache
return {k: cache[k] for k in keys if k in cache}
def set(self, key, value, *args, **kwargs):
self.cache[key] = value
def delete(self, key, *args, **kwargs):
self.cache.pop(key, None)
def incr(self, key, delta=1):
return self.cache.incr(key, delta)
def touch(self, key, expire):
pass
backends = {
'memcache': get_best_memcache,
'memcached': get_best_memcache,
'pylibmc': get_best_memcache,
'memory': lambda: (DummyClient, ensure_bytes),
}
class CacheBackend(KeyValueStoreBackend):
"""Cache result backend."""
servers = None
supports_autoexpire = True
supports_native_join = True
implements_incr = True
def __init__(self, app, expires=None, backend=None,
options=None, url=None, **kwargs):
options = {} if not options else options
super().__init__(app, **kwargs)
self.url = url
self.options = dict(self.app.conf.cache_backend_options,
**options)
self.backend = url or backend or self.app.conf.cache_backend
if self.backend:
self.backend, _, servers = self.backend.partition('://')
self.servers = servers.rstrip('/').split(';')
self.expires = self.prepare_expires(expires, type=int)
try:
self.Client, self.key_t = backends[self.backend]()
except KeyError:
raise ImproperlyConfigured(UNKNOWN_BACKEND.format(
self.backend, ', '.join(backends)))
self._encode_prefixes() # rencode the keyprefixes
def get(self, key):
return self.client.get(key)
def mget(self, keys):
return self.client.get_multi(keys)
def set(self, key, value):
return self.client.set(key, value, self.expires)
def delete(self, key):
return self.client.delete(key)
def _apply_chord_incr(self, header_result, body, **kwargs):
chord_key = self.get_key_for_chord(header_result.id)
self.client.set(chord_key, 0, time=self.expires)
return super()._apply_chord_incr(
header_result, body, **kwargs)
def incr(self, key):
return self.client.incr(key)
def expire(self, key, value):
return self.client.touch(key, value)
@cached_property
def client(self):
return self.Client(self.servers, **self.options)
def __reduce__(self, args=(), kwargs=None):
kwargs = {} if not kwargs else kwargs
servers = ';'.join(self.servers)
backend = f'{self.backend}://{servers}/'
kwargs.update(
{'backend': backend,
'expires': self.expires,
'options': self.options})
return super().__reduce__(args, kwargs)
def as_uri(self, *args, **kwargs):
"""Return the backend as an URI.
This properly handles the case of multiple servers.
"""
servers = ';'.join(self.servers)
return f'{self.backend}://{servers}/'