Permalink
Browse files

Merge pull request #10 from wgen/master

Request update to support configuring behaviors for pylibmc client
  • Loading branch information...
2 parents e41fc3a + 4d68724 commit c2a7892977b57518e311f3282cbf0d32fad6e422 @bbangert committed Apr 27, 2012
Showing with 122 additions and 7 deletions.
  1. +28 −5 beaker/ext/memcached.py
  2. +57 −0 beaker/util.py
  3. +37 −2 tests/test_memcached.py
View
@@ -3,7 +3,7 @@
from beaker.crypto.util import sha1
from beaker.exceptions import InvalidCacheBackendError, MissingCacheParameter
from beaker.synchronization import file_synchronizer
-from beaker.util import verify_directory, SyncDict
+from beaker.util import verify_directory, SyncDict, parse_memcached_behaviors
import warnings
MAX_KEY_LENGTH = 250
@@ -54,6 +54,11 @@ def _auto():
return clib
+def _is_configured_for_pylibmc(memcache_module_config, memcache_client):
+ return memcache_module_config == 'pylibmc' or \
+ memcache_client.__name__.startswith('pylibmc')
+
+
class MemcachedNamespaceManager(NamespaceManager):
"""Provides the :class:`.NamespaceManager` API over a memcache client library."""
@@ -64,8 +69,7 @@ def __new__(cls, *args, **kw):
memcache_client = _load_client(memcache_module)
- if memcache_module == 'pylibmc' or \
- memcache_client.__name__.startswith('pylibmc'):
+ if _is_configured_for_pylibmc(memcache_module, memcache_client):
return object.__new__(PyLibMCNamespaceManager)
else:
return object.__new__(MemcachedNamespaceManager)
@@ -88,7 +92,11 @@ def __init__(self, namespace, url,
if self.lock_dir:
verify_directory(self.lock_dir)
- self.mc = MemcachedNamespaceManager.clients.get(
+ # Check for pylibmc namespace manager, in which case client will be
+ # instantiated by subclass __init__, to handle behavior passing to the
+ # pylibmc client
+ if not _is_configured_for_pylibmc(memcache_module, _memcache_module):
+ self.mc = MemcachedNamespaceManager.clients.get(
(memcache_module, url),
_memcache_module.Client,
url.split(';'))
@@ -138,8 +146,23 @@ def keys(self):
class PyLibMCNamespaceManager(MemcachedNamespaceManager):
"""Provide thread-local support for pylibmc."""
- def __init__(self, *arg, **kw):
+ def __init__(self, *arg, **kw):
super(PyLibMCNamespaceManager, self).__init__(*arg, **kw)
+
+ memcache_module = kw.get('memcache_module', 'auto')
+ _memcache_module = _client_libs[memcache_module]
+ protocol = kw.get('protocol', 'text')
+ username = kw.get('username', None)
+ password = kw.get('password', None)
+ url = kw.get('url')
+ behaviors = parse_memcached_behaviors(kw)
+
+ self.mc = MemcachedNamespaceManager.clients.get(
+ (memcache_module, url),
+ _memcache_module.Client,
+ servers=url.split(';'), behaviors=behaviors,
+ binary=(protocol=='binary'), username=username,
+ password=password)
self.pool = pylibmc.ThreadMappedPool(self.mc)
def __getitem__(self, key):
View
@@ -330,6 +330,48 @@ def coerce_cache_params(params):
]
return verify_rules(params, rules)
+def coerce_memcached_behaviors(behaviors):
+ rules = [
+ ('cas', (bool, int), 'cas must be a boolean or an integer'),
+ ('no_block', (bool, int), 'no_block must be a boolean or an integer'),
+ ('receive_timeout', (int,), 'receive_timeout must be an integer'),
+ ('send_timeout', (int,), 'send_timeout must be an integer'),
+ ('ketama_hash', (str,), 'ketama_hash must be a string designating '
+ 'a valid hashing strategy option'),
+ ('_poll_timeout', (int,), '_poll_timeout must be an integer'),
+ ('auto_eject', (bool, int), 'auto_eject must be an integer'),
+ ('retry_timeout', (int,), 'retry_timeout must be an integer'),
+ ('_sort_hosts', (bool, int), '_sort_hosts must be an integer'),
+ ('_io_msg_watermark', (int,), '_io_msg_watermark must be an integer'),
+ ('ketama', (bool, int), 'ketama must be a boolean or an integer'),
+ ('ketama_weighted', (bool, int), 'ketama_weighted must be a boolean or '
+ 'an integer'),
+ ('_io_key_prefetch', (int, bool), '_io_key_prefetch must be a boolean '
+ 'or an integer'),
+ ('_hash_with_prefix_key', (bool, int), '_hash_with_prefix_key must be '
+ 'a boolean or an integer'),
+ ('tcp_nodelay', (bool, int), 'tcp_nodelay must be a boolean or an '
+ 'integer'),
+ ('failure_limit', (int,), 'failure_limit must be an integer'),
+ ('buffer_requests', (bool, int), 'buffer_requests must be a boolean '
+ 'or an integer'),
+ ('_socket_send_size', (int,), '_socket_send_size must be an integer'),
+ ('num_replicas', (int,), 'num_replicas must be an integer'),
+ ('remove_failed', (int,), 'remove_failed must be an integer'),
+ ('_noreply', (bool, int), '_noreply must be a boolean or an integer'),
+ ('_io_bytes_watermark', (int,), '_io_bytes_watermark must be an '
+ 'integer'),
+ ('_socket_recv_size', (int,), '_socket_recv_size must be an integer'),
+ ('distribution', (str,), 'distribution must be a string designating '
+ 'a valid distribution option'),
+ ('connect_timeout', (int,), 'connect_timeout must be an integer'),
+ ('hash', (str,), 'hash must be a string designating a valid hashing '
+ 'option'),
+ ('verify_keys', (bool, int), 'verify_keys must be a boolean or an integer'),
+ ('dead_timeout', (int,), 'dead_timeout must be an integer')
+ ]
+ return verify_rules(behaviors, rules)
+
def parse_cache_config_options(config, include_defaults=True):
"""Parse configuration options and validate for use with the
@@ -376,6 +418,21 @@ def parse_cache_config_options(config, include_defaults=True):
options['cache_regions'] = region_configs
return options
+
+def parse_memcached_behaviors(config):
+ """Parse behavior options and validate for use with pylibmc
+ client/PylibMCNamespaceManager, or potentially other memcached
+ NamespaceManagers that support behaviors"""
+ behaviors = {}
+
+ for key, val in config.iteritems():
+ if key.startswith('behavior.'):
+ behaviors[key[9:]] = val
+
+ coerce_memcached_behaviors(behaviors)
+ return behaviors
+
+
def func_namespace(func):
"""Generates a unique namespace for a function"""
kls = None
View
@@ -2,9 +2,10 @@
import mock
import os
-from beaker.cache import clsmap, Cache, util
+from beaker.cache import clsmap, Cache, CacheManager, util
from beaker.middleware import CacheMiddleware, SessionMiddleware
from beaker.exceptions import InvalidCacheBackendError
+from beaker.util import parse_cache_config_options
from nose import SkipTest
import unittest
@@ -278,7 +279,8 @@ def test_dont_use_pylibmc_client(self):
assert isinstance(cache.namespace, memcached.MemcachedNamespaceManager)
def test_client(self):
- cache = Cache('test', data_dir='./cache', url=mc_url, type="ext:memcached")
+ cache = Cache('test', data_dir='./cache', url=mc_url, type="ext:memcached",
+ protocol='binary')
o = object()
cache.set_value("test", o)
assert cache.has_key("test")
@@ -287,3 +289,36 @@ def test_client(self):
assert "foo" not in cache
cache.remove_value("test")
assert not cache.has_key("test")
+
+ def test_client_behaviors(self):
+ config = {
+ 'cache.lock_dir':'./lock',
+ 'cache.data_dir':'./cache',
+ 'cache.type':'ext:memcached',
+ 'cache.url':mc_url,
+ 'cache.memcache_module':'pylibmc',
+ 'cache.protocol':'binary',
+ 'cache.behavior.ketama': 'True',
+ 'cache.behavior.cas':False,
+ 'cache.behavior.receive_timeout':'3600',
+ 'cache.behavior.send_timeout':1800,
+ 'cache.behavior.tcp_nodelay':1,
+ 'cache.behavior.auto_eject':"0"
+ }
+ cache_manager = CacheManager(**parse_cache_config_options(config))
+ cache = cache_manager.get_cache('test_behavior', expire=6000)
+
+ with cache.namespace.pool.reserve() as mc:
+ assert "ketama" in mc.behaviors
+ assert mc.behaviors["ketama"] == 1
+ assert "cas" in mc.behaviors
+ assert mc.behaviors["cas"] == 0
+ assert "receive_timeout" in mc.behaviors
+ assert mc.behaviors["receive_timeout"] == 3600
+ assert "send_timeout" in mc.behaviors
+ assert mc.behaviors["send_timeout"] == 1800
+ assert "tcp_nodelay" in mc.behaviors
+ assert mc.behaviors["tcp_nodelay"] == 1
+ assert "auto_eject" in mc.behaviors
+ assert mc.behaviors["auto_eject"] == 0
+

0 comments on commit c2a7892

Please sign in to comment.