## The "Wait-Until" Rate-Limiter - Code Recipe

[Original repository on GitHub](https://github.com/Hvass-Labs/Code-Recipes)

Original author is [Magnus Erik Hvass Pedersen](http://www.hvass-labs.org/)

## Introduction

Rate-limiters are used to throttle events that are too rapid e.g. to prevent attacks on web-servers. The method presented here is simple and efficient. It maintains a "wait-until" time-stamp that gets incremented whenever the rate-limiter is being called (or "hit"). It can then either sleep the CPU for the required waiting-time, or raise an error if too many hits have accumulated. This is a good way of rate-limiting in multi-threaded and distributed environments, as it just requires an atomic update to the "wait-until" time-stamp.

The algorithm is apparently new, so let us call it the "wait-until" rate-limiter. It bears some similarity to the "leaking-bucket" algorithm as it also ensures a steady flow, but it doesn't explicitly require a queue in its implementation. It can also allow bursts simply by not sleeping the CPU, and just raise an error when too many hits have accumulated.

The algorithm is first demonstrated using basic Python, and then using MongoDB for saving and updating the "wait-until" time-stamp.

## Imports

In [1]:
import time
from datetime import datetime, timedelta

## Python Code

In [2]:
# Number of seconds to wait between two successive hits.
seconds = 1

# Number of seconds to accumulate before raising exception.
seconds_raise = 5

# Max number of seconds to wait.
seconds_limit = 10

# Convert to timedelta.
td_seconds = timedelta(seconds=seconds)
td_seconds_limit = timedelta(seconds=seconds_limit)

# Init "wait-until" time-stamp to sometime in the past.
# This would normally be saved in a database (see example further below).
_wait_until = datetime.utcnow() - timedelta(days=1)

In [3]:
def rate_limit():
    """
    Hit a rate-limiter and determine whether waiting is required. Successive
    calls (or "hits") to this function will increase the required waiting-time.
    """
    global _wait_until

    # Time-stamp for now.
    time_now = datetime.utcnow()

    # Time-stamp for the next wait-until, if not accumulating.
    time_wait_until = time_now + td_seconds

    # Time-stamp for the max allowed wait.
    time_limit = time_now + td_seconds_limit

    # Number of seconds to wait. Important to calculate this before update!
    wait_seconds = (_wait_until - time_now).total_seconds()

    print(f'Wait seconds: {wait_seconds:.2f}', end=' - ')

    # Update the saved "wait-until" time-stamp.
    # WARNING: This must be atomic to avoid race-conditions in multi-threading.
    _wait_until = min(time_limit, max(_wait_until + td_seconds, time_wait_until))

    # Raise exception if too many hits have accumulated.
    if wait_seconds > seconds_raise:
        print('raise error!')
        # raise RuntimeError()
        return

    # Sleep the CPU thread?
    if wait_seconds > 0:
        print('sleep.')
        # You can allow bursts by not sleeping here.
        # sleep(wait_seconds)
    else:
        print('proceed.')

In [4]:
# Simulate hits to the rate-limiter that don't have to wait.
for i in range(3):
    rate_limit()
    time.sleep(seconds*1.01)

Wait seconds: -86400.98 - proceed.
Wait seconds: -0.01 - proceed.
Wait seconds: -0.01 - proceed.


In [5]:
# Simulate hits to the rate-limiter that will sleep and raise errors.
for i in range(10):
    rate_limit()

Wait seconds: -0.02 - proceed.
Wait seconds: 1.00 - sleep.
Wait seconds: 2.00 - sleep.
Wait seconds: 3.00 - sleep.
Wait seconds: 4.00 - sleep.
Wait seconds: 5.00 - sleep.
Wait seconds: 6.00 - raise error!
Wait seconds: 7.00 - raise error!
Wait seconds: 8.00 - raise error!
Wait seconds: 9.00 - raise error!


## MongoDB Code

The following code shows how to implement the "wait-until" rate-limiter using MongoDB to save and update the time-stamps. This code assumes you have MongoDB and pymongo installed. The code may also need some modification for your particular environment. On my local computer this adds around 1 milli-sec of overhead, which is plenty fast for my use-cases, but if you need more speed then you will have to use a faster data-storage that also supports atomic updates with simple math operations.

In [None]:
import pymongo as pm

# MongoDB constants.
RETURN_BEFORE = pm.collection.ReturnDocument.BEFORE

# Open database connection.
client = pm.MongoClient(host='mongodb://localhost:27017/')
db = client['my_db']

# Database collection for rate-limiters.
col = db['rate_limit']

# Create indexes.
col.create_index('key', unique=True)
col.create_index('wait_until')

In [None]:
def delete_old():
    """
    Delete all rate-limiters in the database that are no longer active.

    This could be run e.g. every hour in a cron-script to keep database small.
    
    :return: Int with the number of documents deleted.
    """
    # Database filter for matching old / expired rate-limiters.
    filter = { 'wait_until': {'$lt': datetime.utcnow()} }
    
    # Delete all matching docs from database.
    return col.delete_many(filter=filter)

In [None]:
def rate_limit_mongodb(key, seconds_wait, seconds_raise=None,
                       seconds_limit=None, sleep=False):
    """
    Hit a rate-limiter and determine whether waiting is required. Successive
    calls (or "hits") to this function will increase the required waiting-time.

    This uses MongoDB to save the "wait-until" time-stamp.
    
    Time-usage for this function is about 1 milli-sec.

    :param key:
        String with a unique ID for the rate-limiter.

    :param seconds_wait:
        Float with number of seconds to wait between two successive hits.

    :param seconds_raise:
        Float with number of seconds to accumulate before raising exception.
        If `None` then exception will not be raised.

    :param seconds_limit:
        Float with max number of seconds to wait.
        If `None` then set it to equal `seconds_wait`.

    :param sleep:
        Boolean whether to sleep CPU thread if waiting is required.
        You can allow bursts by not sleeping and just raise errors.
    
    :return:
        Boolean whether waiting is required.
    """
    # Database filter.
    filter = { 'key': key }
    
    # Time-stamp for now.
    time_now = datetime.utcnow()
    
    # Time-stamp for the next wait-until, if not accumulating.
    time_wait_until = time_now + timedelta(seconds=seconds_wait)
    
    # Time-stamp for the max allowed wait.
    if seconds_limit is None:
        # Use seconds_wait again.
        time_limit = time_wait_until
    else:
        # Use seconds_limit.
        time_limit = time_now + timedelta(seconds=seconds_limit)

    # Wrong update-rule which does not accumulate the wait-time, so it doesn't
    # actually delay N rapid requests by N * seconds_wait, but only delays them
    # by a total of seconds_wait.
    # update = {'$set': filter | {'wait_until': time_wait_until}}

    # Update-rule which accumulates the wait-time correctly. It is basically:
    # doc['wait_until'] = min(time_limit, max(time_wait_until,
    #                         doc['wait_until'] + seconds_wait))
    update_time_add = \
        {'$dateAdd': {'startDate': '$wait_until',
                      'unit': 'second', 'amount': seconds_wait}}
    update_time_max = {'$max': [update_time_add, time_wait_until]}
    update_wait_until = {'$min': [time_limit, update_time_max]}

    # Database update-rule. This must be a pipeline / list to work correctly.
    update = [{'$set': filter},
              {'$set': {'wait_until': update_wait_until}}]

    # Find and update document. Insert new doc if it doesn't exist.
    # It is important to return the document BEFORE it was updated.
    doc = col.find_one_and_update(filter=filter, update=update,
                                  return_document=RETURN_BEFORE, upsert=True)

    # Did the document already exist?
    if doc is not None:
        # Number of seconds to wait.
        wait_seconds = (doc['wait_until'] - time_now).total_seconds()

        # Raise exception if too many hits have accumulated.
        if seconds_raise is not None and wait_seconds >= seconds_raise:
            raise RuntimeError('Rate-limiter.')

        # Boolean whether waiting is required.
        must_wait = (wait_seconds >= 0)
        
        # Sleep the CPU thread?
        if must_wait and sleep:
            time.sleep(wait_seconds)
    else:
        # Waiting is not required.
        must_wait = False

    return must_wait

In [None]:
# Test the rate-limiter.
for i in range(10):
    rate_limit_mongodb(key='some_unique_id', seconds_wait=5,
                       seconds_raise=30, seconds_limit=120, sleep=True)

## License (MIT)

This is published under the [MIT License](https://github.com/Hvass-Labs/Code-Recipes/blob/main/LICENSE) which allows very broad use for both academic and commercial purposes.

You are very welcome to modify and use this source-code in your own project. Please keep a link to the [original repository](https://github.com/Hvass-Labs/Code-Recipes).