Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented bucket algorithm for handling bursts / simultaneous requests #373

Merged
merged 15 commits into from
Oct 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 24 additions & 0 deletions ccxt/async/exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import asyncio
import concurrent
import socket
import time
import math
import random

# -----------------------------------------------------------------------------

Expand Down Expand Up @@ -72,6 +75,26 @@ def __del__(self):
# def run_rest_poller_loop
# await asyncio.sleep (exchange.rateLimit / 1000.0)

async def wait_for_token(self):
while self.rateLimitTokens <= 1:
# if self.verbose:
# print('Waiting for tokens: Exchange: {0}'.format(self.id))
self.add_new_tokens()
seconds_delays = [0.01, 0.1, 0.7, 1, 1.5, 2]
delay = random.choice(seconds_delays)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samholt, these two lines are not very clear to me, namely, the choice for 0.01, 0.1, 0.7, 1, 1.5 and 2. I can see that you are randomizing the delays here, but I'm not sure why. Can you please elaborate on that?

Copy link
Contributor Author

@samholt samholt Oct 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly, CS/EEE theory from how routers / ethernet optimally scales bandwidth to the maximal optimal rate for the resources. I.e. the Ethernet protocol or similarly the IEEE 802.3 CSMA/CD standard, both implement an exponential back off algorithm (https://en.wikipedia.org/wiki/Exponential_backoff) to avoid multiple accesses to the same resource (a collision) (This prevents hundreds of asyncio tasks checking for new available tokens at the same time). From theory the randomised delays should follow a similar process to https://en.wikipedia.org/wiki/Exponential_backoff#Example_exponential_backoff_algorithm, however a simplified version here is to just combine 1 - 3, to achieve an approximation, which is crudely the set above. Upon second thought the times should be exponentially distributed in the set, thus if we assume a minimum delay of 1ms and max delay of 500ms, of a set of 5, thus: [0.001, 0.005, 0.022, 0.106, 0.5].

import numpy as np
x = np.linspace(np.log(0.001), np.log(0.5),5)
y = np.exp(x)

for i in y:
    print(round(i,3))

await asyncio.sleep(delay)
self.rateLimitTokens -= 1

def add_new_tokens(self):
# if self.verbose:
# print('Adding new tokens: Exchange: {0}'.format(self.id))
now = time.monotonic()
time_since_update = now - self.rateLimitUpdateTime
new_tokens = math.floor( ( 0.8 * 1000.0 * time_since_update ) / self.rateLimit )
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... and this one, the 0.8 magic constant looks misterious ) I may be missing something here %)

Copy link
Contributor Author

@samholt samholt Oct 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially new tokens was to set to be dynamically calculated, purposely based on how many tokens are initially sent out to adjust the token generation, so as to over a fixed time span (e.g. 60 seconds), the bucket algorithm to send the same number of requests a single delayed poller, sending out requests at a periodic time ( period of the rateLimit). Thus stay under the rate limit, and support bursts of requests over time, for optimal request rates etc.

The theory is as follows:

  1. Traditional linear rate limit poller, rateLimit = 1.5 seconds, over a period for example T = 60 seconds, will send 60/1.5 = 40 requests, or here each request is represented as a token.
  2. Dynamic rate limiter, must still satisfy sending 40 requests or less, over this same period of T = 60 seconds. Thus if the first burst, takes 16 requests, that leaves 40 - 16 = 24 requests left, of which should be added to the token bucket linearly, i.e. over this time period of T, thus a new token/request must be added every 2.5 seconds. This ensures that the original rate limit is met, whilst allowing bursts of requests over short periods of time.
    Thus the new_tokens = (1/2.5)* time_since_update.

These simple equations can be represented as:

screen shot 2017-10-23 at 22 32 26

However upon calculating this for a typical exchange with a rateLimit as defined above, we can express the new token rate a percentage of the old rate limit, i.e. 2.5/1.5, which can be factored out, to yield.

new_tokens = (1.5/2.5)*(1/1.5)*time_since_update
new_tokens = 0.6 * (1/rateLimit) * time_since_update

Thus we should approximately take 0.6 or 60% of the linear rate limit if we use the constants defined above. Although this works, I found after repeated tests that exchanges can be throttled faster than this and don't error with DDOS. The 0.8, or 80% of (1/rateLimit), was one of the highest rates, that exchanges could still be throttled at without reporting DDOS or rate limit. Keen to hear everyones views on this, especially if we should stick to the lower limits, what T time period rate limits are usually calculated for behind exchanges, and if we should always try to maximise the rate at which we can send requests.

if new_tokens > 1:
self.rateLimitTokens = min(self.rateLimitTokens + new_tokens, self.rateLimitMaxTokens)
self.rateLimitUpdateTime = now

async def fetch(self, url, method='GET', headers=None, body=None):
"""Perform a HTTP request and return decoded JSON data"""
headers = headers or {}
Expand All @@ -88,6 +111,7 @@ async def fetch(self, url, method='GET', headers=None, body=None):
print(url, method, url, "\nRequest:", headers, body)
encoded_body = body.encode() if body else None
session_method = getattr(self.aiohttp_session, method.lower())
await self.wait_for_token()
try:
async with session_method(url, data=encoded_body, headers=headers, timeout=(self.timeout / 1000), proxy=self.aiohttp_proxy) as response:
text = await response.text()
Expand Down
3 changes: 3 additions & 0 deletions ccxt/exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ class Exchange(object):
lastRestPollTimestamp = 0
restRequestQueue = None
restPollerLoopIsRunning = False
rateLimitTokens = 16
rateLimitMaxTokens = 16
rateLimitUpdateTime = 0
last_http_response = None
last_json_response = None

Expand Down
13 changes: 12 additions & 1 deletion test/test_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,18 @@ async def test_tickers(exchange, symbol):
tickers = await exchange.fetch_tickers([symbol])
dump(green(exchange.id), 'fetched', green(len(list(tickers.keys()))), 'tickers')
else:
dump(yellow(exchange.id), 'fetching all tickers at once not supported')
dump(green(exchange.id), 'fetching all tickers by simultaneous multiple concurrent requests')
# Some exchanges not all the symbols can fetch tickers for
symbols_to_load = [symbol for symbol in exchange.symbols if not '.d' in symbol]
if exchange.id == 'bitmex':
symbols_to_load = ['BTC/USD', 'B_BLOCKSZ17', 'DASHZ17', 'ETC7D', 'ETHZ17', 'LTCZ17', 'XBJZ17', 'XBTZ17', 'XMRZ17', 'XRPZ17', 'XTZZ17', 'ZECZ17']
Copy link
Member

@kroitor kroitor Oct 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, this line for Bitmex will break the test upon monthly contract expiration. Basically, their tickers change continuously, so we have to make another workaround for it. But, this isn't a big problem, will fix it anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting yes, it was just a quick fix, as errors were occurring for attempting to fetch all of the individual tickers for each symbol contained in bitmex.symbols, it seems some of them are not supported, however still listed ? Keen to see your fix :)

elif exchange.id == 'bl3p':
symbols_to_load = ['BTC/EUR']
elif exchange.id == 'virwox':
symbols_to_load = [symbol for symbol in symbols_to_load if symbol != 'CHF/SLL']
input_coroutines = [exchange.fetchTicker(symbol) for symbol in symbols_to_load]
tickers = await asyncio.gather(*input_coroutines)
Copy link
Member

@kroitor kroitor Oct 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though sending 16 requests at once to fetch tickers can work, it will most likely result in a ban from Bittrex or Kraken, if you try doing the same with private requests and with L2 orderbooks continuously...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, it sends 16 requests, then adds the possibility for a new request at slightly less than the rate limit, to make sure over a fixed time period the specified rate limit is met for the exchange. Looking at Bittrex API documentation, I can't readily see any information about their API rate limit ? (https://bittrex.com/home/api) Multiple tests to their api using this seems to work, however long polling of continuous order books is yet to be tested (I plan to add a test to test loading all order books for all exchanges). Yes I agree with Kraken in its current implementation, can make it work for Kraken by specifically adding a lower starting 'rateLimitMaxTokens' : 8, and 'rateLimitTokens' : 8.

Copy link
Member

@kroitor kroitor Oct 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it sends 16 requests, then adds the possibility for a new request at slightly less than the rate limit, to make sure over a fixed time period the specified rate limit is met for the exchange.

Yep, and if those 16 requests are private requests (fetching balance, placing and canceling orders, fetching orders/trades history), those will fail most of the time, because practically none of the exchanges tolerate that rate.

Looking at Bittrex API documentation, I can't readily see any information about their API rate limit ?

Yes, they don't have it documented, but it's been reverse-engineered to be approx 1 request per second. They don't want to be transparent, I don't really know why. Liqui will also complain upon querying at rates higher than one or two requests per second. And it takes Kraken approx 10 (!) seconds to process an order, and it has been so for past few weeks, so, i guess sending 16 of them at once is not our option ) I am still adding some minor edits here and there to complete the merge, but most of it is already there ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good, agreed, perhaps we should limit the throttle for asyncio to only public calls ? And keep the old rate limit delay structure for private API calls ? How do we solve this ?

Copy link
Member

@kroitor kroitor Oct 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say, we are fine. It's just that I made that functionality optional, so users may or may not enable it at their discretion depending on their use case. So if they design a poller application for alerts or something, they can go ahead and turn it full on (no warranties though). And if they want to do trading, then they can leave it off until we come up with a generic solution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice very good

dump(green(exchange.id), 'fetched', green(len(list(symbols_to_load))), 'tickers')

# ------------------------------------------------------------------------------

Expand Down