Skip to content

Commit

Permalink
Merge pull request #4 from agoragames/redis-sentinel-support
Browse files Browse the repository at this point in the history
Redis sentinel support
  • Loading branch information
alisaifee committed Oct 5, 2015
2 parents 13be116 + b32e4cd commit 7f89a48
Show file tree
Hide file tree
Showing 8 changed files with 2,119 additions and 42 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ build/
dist/
htmlcov
*egg-info*
*.rdb
7 changes: 6 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@ python:

install:
- pip install -r requirements/ci.txt --use-mirrors

services:
- redis-server
- memcached

before_script:
- redis-server ./tests/redis-configurations/redis-master.conf
- redis-server ./tests/redis-configurations/redis-slave.conf
- redis-server ./tests/redis-configurations/redis-sentinel.conf --sentinel

script: nosetests tests --with-cov -v
after_success:
- coveralls
1 change: 1 addition & 0 deletions doc/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Storage
.. autoclass:: limits.storage.Storage
.. autoclass:: limits.storage.MemoryStorage
.. autoclass:: limits.storage.RedisStorage
.. autoclass:: limits.storage.RedisSentinelStorage
.. autoclass:: limits.storage.MemcachedStorage
.. autofunction:: limits.storage.storage_from_string

Expand Down
206 changes: 167 additions & 39 deletions limits/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,7 @@ def check(self):
"""
return True


class RedisStorage(Storage):
"""
rate limit storage with redis as backend
"""

STORAGE_SCHEME = "redis"
class RedisCommon:
SCRIPT_MOVING_WINDOW = """
local items = redis.call('lrange', KEYS[1], 0, tonumber(ARGV[2]))
local expiry = tonumber(ARGV[1])
Expand All @@ -240,6 +234,87 @@ class RedisStorage(Storage):
return {oldest, a}
"""

def redis_incr(self, connection, key, expiry, elastic_expiry=False):
"""
increments the counter for a given rate limit key
:param connection: Redis connection
:param str key: the key to increment
:param int expiry: amount in seconds for the key to expire in
"""
value = connection.incr(key)
if elastic_expiry or value == 1:
connection.expire(key, expiry)
return value

def redis_get(self, connection, key):
"""
:param connection: Redis connection
:param str key: the key to get the counter value for
"""
return int(connection.get(key) or 0)

def get_moving_window(self, key, limit, expiry):
"""
returns the starting point and the number of entries in the moving window
:param str key: rate limit key
:param int expiry: expiry of entry
"""
timestamp = time.time()
window = self.lua_moving_window(
[key], [int(timestamp - expiry), limit]
)
return window or (timestamp, 0)

def redis_acquire_entry(self, connection, key, limit, expiry, no_add=False):
"""
:param connection: Redis connection
:param str key: rate limit key to acquire an entry in
:param int limit: amount of entries allowed
:param int expiry: expiry of the entry
:param bool no_add: if False an entry is not actually acquired but instead
serves as a 'check'
:return: True/False
"""
timestamp = time.time()
with self.lock_impl("%s/LOCK" % key, blocking_timeout=1):
entry = connection.lindex(key, limit - 1)
if entry and float(entry) >= timestamp - expiry:
return False
else:
if not no_add:
with connection.pipeline(transaction=False) as pipeline:
pipeline.lpush(key, timestamp)
pipeline.ltrim(key, 0, limit - 1)
pipeline.expire(key, expiry)
pipeline.execute()
return True

def redis_get_expiry(self, connection, key):
"""
:param connection: Redis connection
:param str key: the key to get the expiry for
"""
return int((connection.ttl(key) or 0) + time.time())

def redis_check(self, connection):
"""
:param connection: Redis connection
check if storage is healthy
"""
try:
return connection.ping()
except: # noqa
return False

class RedisStorage(Storage, RedisCommon):
"""
rate limit storage with redis as backend
"""

STORAGE_SCHEME = "redis"

def __init__(self, uri, **_):
"""
:param str redis_url: url of the form 'redis://host:port'
Expand All @@ -256,7 +331,7 @@ def initialize_storage(self, uri):
if not self.storage.ping():
raise ConfigurationError("unable to connect to redis at %s" % uri) # pragma: no cover
self.lua_moving_window = self.storage.register_script(
RedisStorage.SCRIPT_MOVING_WINDOW
RedisCommon.SCRIPT_MOVING_WINDOW
)
self.lock_impl = self.storage.lock

Expand All @@ -267,16 +342,13 @@ def incr(self, key, expiry, elastic_expiry=False):
:param str key: the key to increment
:param int expiry: amount in seconds for the key to expire in
"""
value = self.storage.incr(key)
if elastic_expiry or value == 1:
self.storage.expire(key, expiry)
return value
return self.redis_incr(self.storage, key, expiry, elastic_expiry)

def get(self, key):
"""
:param str key: the key to get the counter value for
"""
return int(self.storage.get(key) or 0)
return self.redis_get(self.storage, key)

def acquire_entry(self, key, limit, expiry, no_add=False):
"""
Expand All @@ -287,47 +359,103 @@ def acquire_entry(self, key, limit, expiry, no_add=False):
serves as a 'check'
:return: True/False
"""
timestamp = time.time()
with self.lock_impl("%s/LOCK" % key, blocking_timeout=1):
entry = self.storage.lindex(key, limit - 1)
if entry and float(entry) >= timestamp - expiry:
return False
else:
if not no_add:
with self.storage.pipeline(transaction=False) as pipeline:
pipeline.lpush(key, timestamp)
pipeline.ltrim(key, 0, limit - 1)
pipeline.expire(key, expiry)
pipeline.execute()
return True
return self.redis_acquire_entry(self.storage, key, limit, expiry, no_add)

def get_moving_window(self, key, limit, expiry):
def get_expiry(self, key):
"""
returns the starting point and the number of entries in the moving window
:param str key: the key to get the expiry for
"""
return self.redis_get_expiry(self.storage, key)

:param str key: rate limit key
:param int expiry: expiry of entry
def check(self):
"""
timestamp = time.time()
window = self.lua_moving_window(
[key], [int(timestamp - expiry), limit]
check if storage is healthy
"""
return self.redis_check(self.storage)


class RedisSentinelStorage(Storage, RedisCommon):
"""
rate limit storage with redis sentinel as backend
"""

STORAGE_SCHEME = "redis+sentinel"

def __init__(self, uri, **options):
"""
:raise ConfigurationError: when the redis library is not available
or if the redis master host cannot be pinged.
"""
if not get_dependency("redis"):
raise ConfigurationError("redis prerequisite not available") # pragma: no cover

parsed = urllib.parse.urlparse(uri)
self.sentinel_configuration = []
for loc in parsed.netloc.split(","):
host, port = loc.split(":")
self.sentinel_configuration.append((host, int(port)))

self.sentinel = get_dependency("redis.sentinel").Sentinel(
self.sentinel_configuration,
socket_timeout=options.get("socket_timeout", 0.2)
)
return window or (timestamp, 0)
self.service_name = options.get("service_name")
self.initialize_storage()
super(RedisSentinelStorage, self).__init__()

def initialize_storage(self):
master = self.sentinel.master_for(self.service_name)
if not master.ping():
raise ConfigurationError("unable to connect to redis at %s" % self.sentinel) # pragma: no cover
self.lua_moving_window = master.register_script(
RedisCommon.SCRIPT_MOVING_WINDOW
)
self.lock_impl = master.lock


def incr(self, key, expiry, elastic_expiry=False):
"""
increments the counter for a given rate limit key
:param str key: the key to increment
:param int expiry: amount in seconds for the key to expire in
"""
master = self.sentinel.master_for(self.service_name)
return self.redis_incr(master, key, expiry, elastic_expiry)

def get(self, key):
"""
:param str key: the key to get the counter value for
"""
slave = self.sentinel.slave_for(self.service_name)
return self.redis_get(slave, key)

def acquire_entry(self, key, limit, expiry, no_add=False):
"""
:param str key: rate limit key to acquire an entry in
:param int limit: amount of entries allowed
:param int expiry: expiry of the entry
:param bool no_add: if False an entry is not actually acquired but instead
serves as a 'check'
:return: True/False
"""
master = self.sentinel.master_for(self.service_name)
return self.redis_acquire_entry(master, key, limit, expiry, no_add)

def get_expiry(self, key):
"""
:param str key: the key to get the expiry for
"""
return int((self.storage.ttl(key) or 0) + time.time())
slave = self.sentinel.slave_for(self.service_name)
return self.redis_get_expiry(slave, key)

def check(self):
"""
check if storage is healthy
"""
try:
return self.storage.ping()
except: # noqa
return False
slave = self.sentinel.slave_for(self.service_name)
return self.redis_check(slave)


class MemcachedStorage(Storage):
"""
Expand Down

0 comments on commit 7f89a48

Please sign in to comment.