Skip to content

ArtyomKozyrev8/BucketRateLimiter

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

25 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

License: GPL v3 UNITTESTS codecov MyPy

BucketRateLimiter

BucketRateLimiter is the collection of rate limiters, which are based on Bucket conception.

Rate Limiters are used when you want some function to be executed the concrete amount of times per time unit. The most widespread use case of Rate Limiters is when you use external APIs, e.g. website API. Some websites allows to make only a limited number of requests per second and Rate Limiters solves the issue.

How to install:

pip install bucketratelimiter

Examples:

You can find complete examples on how to use AsyncioBucketTimeRateLimiter and MThreadedBucketTimeRateLimiter in folder examples of this repository. Also you can check tests folder and get some tricks from unittests.

How to use:

Create rate limiter:
from bucketratelimiter import AsyncioBucketTimeRateLimiter

# max_size = 4 and recovery_time = 1.0
# means that we would like to limit something to 4 attempts per second
# max_size - is a size of internal bucket
# recovery_time - time to make bucket full again
# rest_time - is the time to sleep for workers which can not proceed further due to rate limiter
# callback - is a function which should be called after task will be completed
ASYNC_LIMITER = AsyncioBucketTimeRateLimiter(
    max_size=4,
    recovery_time=1.0,
    rest_time=0.2,
    callback=None,
)
from bucketratelimiter import MThreadedBucketTimeRateLimiter

ASYNC_LIMITER = MThreadedBucketTimeRateLimiter(
    max_size=4,
    recovery_time=1.0,
    rest_time=0.2,
    callback=None,
)
"Wrap" some function in ratelimiter:
import asyncio
from bucketratelimiter import AsyncioBucketTimeRateLimiter

limiter = AsyncioBucketTimeRateLimiter()

@limiter # we use decorator to limit the function to a certain number of attempts per second
async def some_func_to_limit(sleep_time: float = 1.0) -> None:
    await asyncio.sleep(sleep_time)
import time
from bucketratelimiter import MThreadedBucketTimeRateLimiter

limiter = MThreadedBucketTimeRateLimiter()

@limiter # we use decorator to limit the function to a certain number of attempts per second
def some_func_to_limit(sleep_time: float = 1.0) -> None:
    time.sleep(sleep_time)
Activate ratelimiter logic:
limiter = AsyncioBucketTimeRateLimiter()

async def main_entry_point() -> None:
    """Main entry point of our asyncio app."""
    q = asyncio.Queue()
    for task in TASKS_TO_COMPLETE:
        await q.put(task)

    # use LIMITER as context manager to ensure its correct activation and end of work
    async with limiter:
        for w in [worker(q) for _ in range(1, WORKER_NUM + 1)]:
            asyncio.create_task(w)

        await q.join()
limiter = MThreadedBucketTimeRateLimiter()

def main_entry_point() -> None:
    """Main entry point of our multithreading app."""
    q = Queue()
    for task in TASKS_TO_COMPLETE:
        q.put(task)

    # use LIMITER as context manager to ensure its correct activation and end of work
    with LIMITER:
        for _ in range(1, WORKER_NUM + 1):
            Thread(target=worker, args=(q, ), daemon=True).start()

        q.join()

HOW TO USE LOW LEVEL API:

Use without context manager:
# Use RateLimiter's method to activate and deactivate it's inner logic
# instead of using context managers
try:
    limiter.activate()
finally:
    limiter.deactivate()
Use without functions decoration:
import asyncio
from bucketratelimiter import AsyncioBucketTimeRateLimiter

limiter = AsyncioBucketTimeRateLimiter()

async def some_func_to_limit(sleep_time: float = 1.0) -> None:
    await asyncio.sleep(sleep_time)
...
async with limiter:
    await limiter.wrap_operation(some_func_to_limit, sleep_time=1.0)

# ATTENTION !
# Do not use wrap_operation to some function more than once
# Do not apply decorator to some function if you use wrap_operation
# It can lead to unexpected results
import time
from bucketratelimiter import MThreadedBucketTimeRateLimiter

limiter = MThreadedBucketTimeRateLimiter()

def some_func_to_limit(sleep_time: float = 1.0) -> None:
    time.sleep(sleep_time)
...
with limiter:
    limiter.wrap_operation(some_func_to_limit, sleep_time=1.0)

FOR CONTRIBUTORS:

Clone the project:

https://github.com/ArtyomKozyrev8/BucketRateLimiter.git
cd BucketRateLimiter

Create a new virtualenv:

python3 -m venv env
source env/bin/activate

Install all requirements:

pip install -e '.[develop]'

Run Tests:

mypy --strict bucketratelimiter

pytest --cov=bucketratelimiter tests/

coverage report

coverage html