/
limits.py
143 lines (111 loc) · 3.61 KB
/
limits.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
"""
"""
from six import add_metaclass
from functools import total_ordering
def safe_string(value):
"""
consistently converts a value to a string
:param value:
:return: str
"""
if isinstance(value, bytes):
return value.decode()
return str(value)
TIME_TYPES = dict(
day=(60 * 60 * 24, "day"),
month=(60 * 60 * 24 * 30, "month"),
year=(60 * 60 * 24 * 30 * 12, "year"),
hour=(60 * 60, "hour"),
minute=(60, "minute"),
second=(1, "second")
)
GRANULARITIES = {}
class RateLimitItemMeta(type):
def __new__(cls, name, parents, dct):
granularity = super(RateLimitItemMeta,
cls).__new__(cls, name, parents, dct)
if 'granularity' in dct:
GRANULARITIES[dct['granularity'][1]] = granularity
return granularity
# pylint: disable=no-member
@add_metaclass(RateLimitItemMeta)
@total_ordering
class RateLimitItem(object):
"""
defines a Rate limited resource which contains the characteristic
namespace, amount and granularity multiples of the rate limiting window.
:param int amount: the rate limit amount
:param int multiples: multiple of the 'per' granularity (e.g. 'n' per 'm' seconds)
:param string namespace: category for the specific rate limit
"""
__metaclass__ = RateLimitItemMeta
__slots__ = ["namespace", "amount", "multiples", "granularity"]
def __init__(self, amount, multiples=1, namespace='LIMITER'):
self.namespace = namespace
self.amount = int(amount)
self.multiples = int(multiples or 1)
@classmethod
def check_granularity_string(cls, granularity_string):
"""
checks if this instance matches a granularity string
of type 'n per hour' etc.
:return: True/False
"""
return granularity_string.lower() in cls.granularity[1:]
def get_expiry(self):
"""
:return: the size of the window in seconds.
"""
return self.granularity[0] * self.multiples
def key_for(self, *identifiers):
"""
:param identifiers: a list of strings to append to the key
:return: a string key identifying this resource with
each identifier appended with a '/' delimiter.
"""
remainder = "/".join([safe_string(k) for k in identifiers] + [
safe_string(self.amount),
safe_string(self.multiples), self.granularity[1]
])
return "%s/%s" % (self.namespace, remainder)
def __eq__(self, other):
return (
self.amount == other.amount
and self.granularity == other.granularity
)
def __repr__(self):
return "%d per %d %s" % (
self.amount, self.multiples, self.granularity[1]
)
def __lt__(self, other):
return self.granularity[0] < other.granularity[0]
class RateLimitItemPerYear(RateLimitItem):
"""
per year rate limited resource.
"""
granularity = TIME_TYPES["year"]
class RateLimitItemPerMonth(RateLimitItem):
"""
per month rate limited resource.
"""
granularity = TIME_TYPES["month"]
class RateLimitItemPerDay(RateLimitItem):
"""
per day rate limited resource.
"""
granularity = TIME_TYPES["day"]
class RateLimitItemPerHour(RateLimitItem):
"""
per hour rate limited resource.
"""
granularity = TIME_TYPES["hour"]
class RateLimitItemPerMinute(RateLimitItem):
"""
per minute rate limited resource.
"""
granularity = TIME_TYPES["minute"]
class RateLimitItemPerSecond(RateLimitItem):
"""
per second rate limited resource.
"""
granularity = TIME_TYPES["second"]