Skip to content

Commit

Permalink
Don't blindly create coroutines, but fire them off in batches
Browse files Browse the repository at this point in the history
  • Loading branch information
xmatthias committed Sep 10, 2021
1 parent 7251a3a commit a19c33b
Showing 1 changed file with 13 additions and 12 deletions.
25 changes: 13 additions & 12 deletions freqtrade/exchange/exchange.py
Expand Up @@ -28,7 +28,7 @@
from freqtrade.exchange.common import (API_FETCH_ORDER_RETRY_COUNT, BAD_EXCHANGES,
EXCHANGE_HAS_OPTIONAL, EXCHANGE_HAS_REQUIRED, retrier,
retrier_async)
from freqtrade.misc import deep_merge_dicts, safe_value_fallback2
from freqtrade.misc import chunks, deep_merge_dicts, safe_value_fallback2
from freqtrade.plugins.pairlist.pairlist_helpers import expand_pairlist


Expand Down Expand Up @@ -1238,19 +1238,20 @@ async def _async_get_historic_ohlcv(self, pair: str, timeframe: str,
input_coroutines = [self._async_get_candle_history(
pair, timeframe, since) for since in
range(since_ms, arrow.utcnow().int_timestamp * 1000, one_call)]

results = await asyncio.gather(*input_coroutines, return_exceptions=True)

# Combine gathered results

data: List = []
for res in results:
if isinstance(res, Exception):
logger.warning("Async code raised an exception: %s", res.__class__.__name__)
continue
# Deconstruct tuple if it's not an exception
p, _, new_data = res
if p == pair:
data.extend(new_data)
for input_coro in chunks(input_coroutines, 100):

results = await asyncio.gather(*input_coro, return_exceptions=True)
for res in results:
if isinstance(res, Exception):
logger.warning("Async code raised an exception: %s", res.__class__.__name__)
continue
# Deconstruct tuple if it's not an exception
p, _, new_data = res
if p == pair:
data.extend(new_data)
# Sort data again after extending the result - above calls return in "async order"
data = sorted(data, key=lambda x: x[0])
logger.info("Downloaded data for %s with length %s.", pair, len(data))
Expand Down

0 comments on commit a19c33b

Please sign in to comment.