Skip to content

Commit

Permalink
WIP: mongodb storage
Browse files Browse the repository at this point in the history
  • Loading branch information
alisaifee committed May 28, 2020
1 parent 5ce9256 commit fa0fbda
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 4 deletions.
5 changes: 5 additions & 0 deletions docker-compose.yml
Expand Up @@ -94,3 +94,8 @@ services:
- type: bind
source: /tmp/
target: /sockets/
mongodb:
image: percona/percona-server-mongodb
command: --storageEngine inMemory
ports:
- '37017:27017'
2 changes: 2 additions & 0 deletions limits/storage/__init__.py
Expand Up @@ -9,6 +9,7 @@
from .redis_cluster import RedisClusterStorage
from .redis_sentinel import RedisSentinelStorage
from .memcached import MemcachedStorage
from .mongodb import MongoDBStorage
from .gae_memcached import GAEMemcachedStorage


Expand All @@ -32,6 +33,7 @@ def storage_from_string(storage_string, **options):
"storage_from_string",
"Storage",
"MemoryStorage",
"MongoDBStorage",
"RedisStorage",
"RedisClusterStorage",
"RedisSentinelStorage",
Expand Down
184 changes: 184 additions & 0 deletions limits/storage/mongodb.py
@@ -0,0 +1,184 @@
import datetime
import time

from .base import Storage
from ..errors import ConfigurationError
from ..util import get_dependency


class MongoDBStorage(Storage):
STORAGE_SCHEME = ["mongodb"]
DEFAULT_OPTIONS = {
'serverSelectionTimeoutMS': 100,
'socketTimeoutMS': 100,
'connectTimeoutMS': 100
}

def __init__(self, uri, **options):
if not get_dependency("pymongo"):
raise ConfigurationError(
"pymongo prerequisite not available"
)

mongo_opts = options.copy()
[mongo_opts.setdefault(k, v) for k, v in self.DEFAULT_OPTIONS.items()]
self.lib = get_dependency("pymongo")
self.storage = self.lib.MongoClient(uri, **mongo_opts)
self.counters = self.storage.limits_database.counters
self.windows = self.storage.limits_database.windows
self.__initialize_database()
super(MongoDBStorage, self).__init__(uri, **options)

def __initialize_database(self):
self.counters.create_index('expireAt', expireAfterSeconds=0)
self.windows.create_index('expireAt', expireAfterSeconds=0)

def reset(self):
num_keys = (
self.counters.count_documents({})
+ self.windows.count_documents({})
)
self.counters.drop()
self.windows.drop()
return num_keys

def clear(self, key):
self.counters.find_one_and_delete({"_id": key})
self.windows.find_one_and_delete({"_id": key})

def get_expiry(self, key):
counter = self.counters.find_one({"_id": key})
expiry = (
counter["expireAt"] if counter
else datetime.datetime.utcnow()
)
return int(time.mktime(expiry.timetuple()))

def get(self, key):
counter = self.counters.find_one(
{"_id": key, "expireAt": {"$gte": datetime.datetime.utcnow()}},
projection=["count"]
)
return counter and counter["count"] or 0

def incr(self, key, expiry, elastic_expiry=False):
expiration = (
datetime.datetime.utcnow()
+ datetime.timedelta(seconds=expiry)
)

return self.counters.find_one_and_update(
{"_id": key},
[
{
"$set": {
"count": {
"$cond": {
"if": {"$lt": ["$expireAt", "$$NOW"]},
"then": 1,
"else": {"$add": ["$count", 1]}
}
},
"expireAt": {
"$cond": {
"if": {"$lt": ["$expireAt", "$$NOW"]},
"then": expiration,
"else": (
expiration if elastic_expiry
else "$expireAt"
)
}
}
}
},
],
upsert=True,
projection=["count"],
return_document=self.lib.ReturnDocument.AFTER
)["count"]

def check(self):
try:
self.storage.server_info()
return True
except: # noqa: E722
return False

def get_moving_window(self, key, limit, expiry):
"""
returns the starting point and the number of entries in the moving
window
:param str key: rate limit key
:param int expiry: expiry of entry
:return: (start of window, number of acquired entries)
"""
timestamp = time.time()
result = list(
self.windows.aggregate(
[
{"$match": {"_id": key}},
{
"$project": {
"entries": {
"$filter": {
"input": "$entries",
"as": "entry",
"cond": {
"$gte": [
"$$entry",
timestamp - expiry,
]
}
}
}
}
},
{"$unwind": "$entries"},
{
"$group": {
"_id": "$_id",
"max": {"$max": "$entries"},
"count": {"$sum": 1}
}
}
]
)
)
if result:
return (int(result[0]["max"]), result[0]["count"])
return (int(timestamp), 0)

def acquire_entry(self, key, limit, expiry, no_add=False):
timestamp = time.time()
try:
updates = {
"$push": {
"entries": {
"$each": [],
"$position": 0,
"$slice": limit
}
}
}
if not no_add:
updates["$set"] = {
"expireAt": (
datetime.datetime.utcnow()
+ datetime.timedelta(seconds=expiry)
)
}
updates["$push"]["entries"]["$each"] = [timestamp]
self.windows.update_one(
{
"_id": key,
"entries.%d" % (limit - 1): {
"$not": {"$gte": timestamp - expiry}
}
},
updates,
upsert=True,
)
return True
except self.lib.errors.DuplicateKeyError:
return False
1 change: 1 addition & 0 deletions requirements/test.txt
Expand Up @@ -4,6 +4,7 @@ pytest
pytest-cov
coverage<5
pymemcache
pymongo
redis<3.1.0
redis-py-cluster>=1.1.0
PyYAML
Expand Down
23 changes: 19 additions & 4 deletions tests/test_storage.py
Expand Up @@ -2,6 +2,7 @@

import mock
import pymemcache.client
import pymongo
import pytest
import redis
import redis.sentinel
Expand All @@ -10,8 +11,9 @@

from limits.errors import ConfigurationError
from limits.storage import (
MemoryStorage, RedisStorage, MemcachedStorage, RedisSentinelStorage,
RedisClusterStorage, Storage, GAEMemcachedStorage, storage_from_string
MemoryStorage, RedisStorage, MemcachedStorage, MongoDBStorage,
RedisSentinelStorage, RedisClusterStorage, Storage, GAEMemcachedStorage,
storage_from_string
)
from limits.strategies import (
MovingWindowRateLimiter
Expand All @@ -26,6 +28,12 @@ def setUp(self):
redis.from_url('unix:///tmp/limits.redis.sock').flushall()
redis.from_url("redis://localhost:7379").flushall()
redis.from_url("redis://:sekret@localhost:7389").flushall()
pymongo.MongoClient(
"mongodb://localhost:37017"
).limit_storage.windows.drop()
pymongo.MongoClient(
"mongodb://localhost:37017"
).limit_storage.counters.drop()
redis.sentinel.Sentinel([
("localhost", 26379)
]).master_for("localhost-redis-sentinel").flushall()
Expand Down Expand Up @@ -101,6 +109,12 @@ def test_storage_string(self):
RedisClusterStorage
)
)
self.assertTrue(
isinstance(
storage_from_string("mongodb://localhost:37017/"),
MongoDBStorage
)
)
if RUN_GAE:
self.assertTrue(
isinstance(
Expand Down Expand Up @@ -163,6 +177,9 @@ def test_storage_check(self):
self.assertTrue(
storage_from_string("redis+cluster://localhost:7000").check()
)
self.assertTrue(
storage_from_string("mongodb://localhost:37017").check()
)
if RUN_GAE:
self.assertTrue(storage_from_string("gaememcached://").check())

Expand Down Expand Up @@ -221,5 +238,3 @@ def get_moving_window(self, *a, **k):
storage = storage_from_string("mystorage://")
self.assertTrue(isinstance(storage, MyStorage))
MovingWindowRateLimiter(storage)


0 comments on commit fa0fbda

Please sign in to comment.