Skip to content

Commit

Permalink
Moved to rate limited .get
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnPaton committed Aug 19, 2018
1 parent a4d5eb5 commit a2d709f
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 88 deletions.
16 changes: 10 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
![Build Status](https://travis-ci.com/JohnPaton/ratelimitqueue.svg?branch=master)](https://travis-ci.com/JohnPaton/ratelimitqueue) [![Coverage Status](https://coveralls.io/repos/github/JohnPaton/ratelimitqueue/badge.svg)](https://coveralls.io/github/JohnPaton/ratelimitqueue) [![Documentation Status](https://readthedocs.org/projects/ratelimitqueue/badge/?version=latest)](https://ratelimitqueue.readthedocs.io/en/latest/?badge=latest)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/ambv/black)

# RateLimitQueue

A rate limited wrapper for Python's thread safe queues.

Some external APIs have rate limits that allow faster-than-consecutive queries, e.g. if the rate limit is very high or the API response is quite slow. To make the most of the API, the best option is to make API calls from multiple threads. Then you can put the requests or URLs to call in a `queue.Queue` and have the threads consume the URLs as they make the calls. However, you still have to make sure that the total calls from all your threads don't exceed the rate limit.
Some external APIs have rate limits that allow faster-than-consecutive queries, e.g. if the rate limit is very high or the API response is quite slow. To make the most of the API, the best option is often to make API calls from multiple threads. Then you can put the requests or URLs to call in a `queue.Queue` and have the threads consume the URLs as they make the calls. However, you still have to make sure that the total calls from all your threads don't exceed the rate limit, which requires some nontrivial coordination.

The `ratelimitqueue` package extends the three built-in Python from from `queue` module - `Queue`, `LifeQueue`, and `PriorityQueue` - with configurable, rate limited counterparts. Specifically, the `get()` method is rate limited across all threads so that workers can safely consume from the queue in a `while queue.not_empty`, and putting the items in the queue doesn't need to require blocking the main thread.
The `ratelimitqueue` package extends the three built-in Python queues from from `queue` package - `Queue`, `LifeQueue`, and `PriorityQueue` - with configurable, rate limited counterparts. Specifically, the `get()` method is rate limited across all threads so that workers can safely consume from the queue in an unlimited loop, and putting the items in the queue doesn't need to require blocking the main thread.

## Installation

To get started, clone this repository and install it with `pip`:

```bash
git clone git@github.com:JohnPaton/ratelimitqueue.git
pip install ratelimitqueue
$ git clone https://github.com/JohnPaton/ratelimitqueue.git
$ cd ratelimitqueue
$ pip install .
```

## Basic Usage
<!-- ## Basic Usage
The most basic usage is to rate limit calls in the main thread
The most basic usage is to rate limit calls in the main thread -->
2 changes: 1 addition & 1 deletion docs/source/ratelimitqueue.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ ratelimitqueue.exceptions module
ratelimitqueue.ratelimitqueue module
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. autoclass:: ratelimitqueue.ratelimitqueue.RateLimitPutMixin
.. autoclass:: ratelimitqueue.ratelimitqueue.RateLimitGetMixin
:members:


Expand Down
33 changes: 27 additions & 6 deletions docs/source/readme.rst
Original file line number Diff line number Diff line change
@@ -1,25 +1,46 @@
|Build Status| |Coverage Status| |Documentation Status| |Code style:
black|
|Build Status|](https://travis-ci.com/JohnPaton/ratelimitqueue)
|Coverage Status| |Documentation Status| |Code style: black|

RateLimitQueue
==============

A thread safe, rate limited Python queue.
A rate limited wrapper for Python鈥檚 thread safe queues.

Some external APIs have rate limits that allow faster-than-consecutive
queries, e.g.聽if the rate limit is very high or the API response is
quite slow. To make the most of the API, the best option is often to
make API calls from multiple threads. Then you can put the requests or
URLs to call in a ``queue.Queue`` and have the threads consume the URLs
as they make the calls. However, you still have to make sure that the
total calls from all your threads don鈥檛 exceed the rate limit, which
requires some nontrivial coordination.

The ``ratelimitqueue`` package extends the three built-in Python queues
from from ``queue`` package - ``Queue``, ``LifeQueue``, and
``PriorityQueue`` - with configurable, rate limited counterparts.
Specifically, the ``get()`` method is rate limited across all threads so
that workers can safely consume from the queue in an unlimited loop, and
putting the items in the queue doesn鈥檛 need to require blocking the main
thread.

Installation
------------

To install ``ratelimitqueue``, clone the repository and install with
``pip``:
To get started, clone this repository and install it with ``pip``:

.. code:: bash
$ git clone https://github.com/JohnPaton/ratelimitqueue.git
$ cd ratelimitqueue
$ pip install .
.. raw:: html

<!-- ## Basic Usage
The most basic usage is to rate limit calls in the main thread -->

.. |Build Status| image:: https://travis-ci.com/JohnPaton/ratelimitqueue.svg?branch=master
:target: https://travis-ci.com/JohnPaton/ratelimitqueue
.. |Coverage Status| image:: https://coveralls.io/repos/github/JohnPaton/ratelimitqueue/badge.svg
:target: https://coveralls.io/github/JohnPaton/ratelimitqueue
.. |Documentation Status| image:: https://readthedocs.org/projects/ratelimitqueue/badge/?version=latest
Expand Down
19 changes: 18 additions & 1 deletion docs/source/readme_manual.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,24 @@ black|
RateLimitQueue
==============

A thread safe, rate limited Python queue.
A rate limited wrapper for Python鈥檚 thread safe queues.

Some external APIs have rate limits that allow faster-than-consecutive
queries, e.g.聽if the rate limit is very high or the API response is
quite slow. To make the most of the API, the best option is often to
make API calls from multiple threads. Then you can put the requests or
URLs to call in a ``queue.Queue`` and have the threads consume the URLs
as they make the calls. However, you still have to make sure that the
total calls from all your threads don鈥檛 exceed the rate limit, which
requires some nontrivial coordination.

The ``ratelimitqueue`` package extends the three built-in Python queues
from from ``queue`` package - ``Queue``, ``LifeQueue``, and
``PriorityQueue`` - with configurable, rate limited counterparts.
Specifically, the ``get()`` method is rate limited across all threads so
that workers can safely consume from the queue in an unlimited loop, and
putting the items in the queue doesn鈥檛 need to require blocking the main
thread.

.. contents::

Expand Down
88 changes: 50 additions & 38 deletions ratelimitqueue/ratelimitqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,78 +2,62 @@
import random

import queue
import multiprocessing.dummy as mp

from .exceptions import RateLimitException
from .utils import get_time_remaining


class RateLimitPutMixin:
"""Adds rate limiting to another class' `put()` method.
class RateLimitGetMixin:
"""Adds rate limiting to another class' `get()` method.
Assumes that the class being extended has properties `per` (float),
`fuzz` (float), and `_call_log` (queue.Queue), else will raise
AttributeError on call of put().
"""

def put(self, item, block=True, timeout=None):
def get(self, block=True, timeout=None):
"""
Put an item into the queue.
Get an item from the queue.
If optional args `block` is True and `timeout` is None (the default),
block if necessary until a free slot is available and the rate limit
has not been reached. If `timeout` is a non-negative number, it blocks
at most `timeout` seconds and raises the RateLimitException if
the required rate limit waiting time is shorter than the given timeout,
or the Full exception if no free slot was available within that time.
or the Empty exception if no item was available within that time.
Otherwise (`block` is False), put an item on the queue if a free slot
Otherwise (`block` is False), get an item on the queue if an item
is immediately available and the rate limit has not been hit. Else
raise the RateLimitException if waiting on the rate limit, or
Full exception if there is no slot available in the queue. Timeout
Empty exception if there is no item available in the queue. Timeout
is ignored in this case.
Parameters
----------
item : obj
The object to put in the queue
block : bool, optional, default True
Whether to block until the item can be put into the queue
Whether to block until an item can be gotten from the queue
timeout : float, optional, default None
The maximum amount of time to block for
"""
start = time.time()
self._pending_get.acquire(block, timeout if block else None)

if timeout is not None and timeout < 0:
raise ValueError("`timeout` must be a non-negative number")

if not hasattr(self, "per"):
raise AttributeError("RateLimitPut requires the `.per` property")

if not hasattr(self, "fuzz"):
raise AttributeError("RateLimitPut requires the `.fuzz` property")

if not hasattr(self, "_call_log"):
raise AttributeError(
"RateLimitPut requires the `._call_log` Queue"
)

if not hasattr(super(), "put"):
raise AttributeError(
"RateLimitPut must be mixed into a base class with"
" the `.put()` method"
)
# make sure child class has the required attributes
self._check_attributes()

# get snapshot of properties so no need to lock
per = self.per
fuzz = self.fuzz

if self._call_log.full():
if self._call_log.qsize() >= self.calls:
# get the earliest call in the queue
first_call = self._call_log.get()
self._call_log.task_done()

time_since_call = time.time() - first_call

Expand All @@ -89,6 +73,7 @@ def put(self, item, block=True, timeout=None):
time_remaining is not None
and time_remaining < sleep_time
):
self._call_log.task_done()
raise RateLimitException(
"Not enough time in timeout to wait for next slot"
)
Expand All @@ -97,8 +82,11 @@ def put(self, item, block=True, timeout=None):

# too fast but not blocking -> exception
else:
self._call_log.task_done()
raise RateLimitException("Too many requests")

self._call_log.task_done()

elif fuzz > 0:
time_remaining = get_time_remaining(start, timeout)
fuzz_time = random.uniform(0, fuzz)
Expand All @@ -116,13 +104,35 @@ def put(self, item, block=True, timeout=None):
if time_remaining is not None and time_remaining <= 0:
raise TimeoutError

super().put(item, block, timeout=time_remaining)

# log the call
# log the call and return the next item
self._call_log.put(time.time())
self._pending_get.release()
return super().get(block, timeout=time_remaining)

def _check_attributes(self):
if not hasattr(self, "per"):
raise AttributeError(
"RateLimitGetMixin requires the `.per` property"
)

if not hasattr(self, "fuzz"):
raise AttributeError(
"RateLimitGetMixin requires the `.fuzz` property"
)

if not hasattr(self, "_call_log"):
raise AttributeError(
"RateLimitGetMixin requires the `._call_log` Queue"
)

class RateLimitQueue(RateLimitPutMixin, queue.Queue):
if not hasattr(super(), "put"):
raise AttributeError(
"RateLimitGetMixin must be mixed into a base class with"
" the `.get()` method"
)


class RateLimitQueue(RateLimitGetMixin, queue.Queue):
def __init__(self, maxsize=0, calls=1, per=1.0, fuzz=0):
"""
A thread safe queue with a given maximum size and rate limit.
Expand All @@ -134,10 +144,10 @@ def __init__(self, maxsize=0, calls=1, per=1.0, fuzz=0):
`per` measured in seconds. The default rate limit is 1 call per
second. If `per` is <= 0, the rate limit is infinite.
To avoid immediately filling the whole queue at startup, an
To avoid immediately getting the maximum allowed items at startup, an
extra randomized wait period can be configured with `fuzz`.
This will cause the RateLimitQueue to wait between 0 and `fuzz`
seconds before putting the object in the queue. Fuzzing only
seconds before getting the object in the queue. Fuzzing only
occurs if there is no rate limit waiting to be done.
Parameters
Expand Down Expand Up @@ -198,9 +208,10 @@ def __init__(self, maxsize=0, calls=1, per=1.0, fuzz=0):
self.fuzz = float(fuzz)

self._call_log = queue.Queue(maxsize=self.calls)
self._pending_get = mp.Semaphore(1)


class RateLimitLifoQueue(RateLimitPutMixin, queue.LifoQueue):
class RateLimitLifoQueue(RateLimitGetMixin, queue.LifoQueue):
def __init__(self, maxsize=0, calls=1, per=1.0, fuzz=0):
"""
A thread safe LIFO queue with a given maximum size and rate limit.
Expand Down Expand Up @@ -275,9 +286,9 @@ def __init__(self, maxsize=0, calls=1, per=1.0, fuzz=0):
self.fuzz = float(fuzz)

self._call_log = queue.Queue(maxsize=self.calls)
self._pending_get = mp.Semaphore(1)


class RateLimitPriorityQueue(RateLimitPutMixin, queue.PriorityQueue):
class RateLimitPriorityQueue(RateLimitGetMixin, queue.PriorityQueue):
def __init__(self, maxsize=0, calls=1, per=1.0, fuzz=0):
"""
A thread safe priority queue with a given maximum size and rate
Expand Down Expand Up @@ -358,3 +369,4 @@ def __init__(self, maxsize=0, calls=1, per=1.0, fuzz=0):
self.fuzz = float(fuzz)

self._call_log = queue.Queue(maxsize=self.calls)
self._pending_get = mp.Semaphore(1)
10 changes: 10 additions & 0 deletions ratelimitqueue/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time
import queue


def get_time_remaining(start, timeout=None):
Expand All @@ -8,3 +9,12 @@ def get_time_remaining(start, timeout=None):
time_elapsed = start - time.time()

return timeout - time_elapsed


def put_time_when_possible(q, timeout=None):
while True:
try:
q.put(time.time(), block=False, timeout=timeout)
return
except queue.Full:
pass
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

setup(
name="ratelimitqueue",
version="0.1.0",
version="0.2.0",
description="A thread safe, rate limited queue.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down

0 comments on commit a2d709f

Please sign in to comment.