-
Notifications
You must be signed in to change notification settings - Fork 1
/
ratelimiter.py
executable file
·81 lines (65 loc) · 2.15 KB
/
ratelimiter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
"""aiohttp rate limiting: limit connections per timeframe to host
(from https://quentin.pradet.me/blog/how-do-you-rate-limit-calls-with-aiohttp.html)
"""
import asyncio
import time
from typing import Any
from urllib.parse import urlsplit
from aiohttp.client import _RequestContextManager
class RateLimiter:
"""
Use like this:
session = RateLimiter(session)
...
async with await session.get(url) as response
Args:
session (aiohttp.ClientSession): aiohttp session to use for requests
"""
RATE = 9 / 60 # requests per second
MAX_TOKENS = 10
def __init__(self, session):
self.session = session
self.start_time = time.monotonic()
self.tokens = dict()
async def get(
self, url: str, *args: Any, **kwargs: Any
) -> _RequestContextManager:
"""Asynchronous code to download a resource after waiting for a token
Args:
url (str): Url to download
*args
**kwargs
Returns:
aiohttp.ClientResponse: Response from request
"""
host = urlsplit(url).netloc
await self.wait_for_token(host)
return self.session.get(url, *args, **kwargs)
async def wait_for_token(self, host: str) -> None:
"""Asynchronous code to handle sleeping if host already connected to
Args:
host (str): Host (server)
Returns:
None
"""
if host not in self.tokens:
self.tokens[host] = [self.MAX_TOKENS, self.start_time]
while self.tokens[host][0] < 1:
self.add_new_tokens(host)
await asyncio.sleep(1)
self.tokens[host][0] -= 1
def add_new_tokens(self, host: str) -> None:
"""Adds new tokens
Args:
host (str): Host (server)
Returns:
None
"""
now = time.monotonic()
time_since_update = now - self.tokens[host][1]
new_tokens = time_since_update * self.RATE
if new_tokens > 1:
self.tokens[host][0] = min(
self.tokens[host][0] + new_tokens, self.MAX_TOKENS
)
self.tokens[host][1] = now