Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/dipdup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
CONFIGURE_HANDLER = 'on_configure'
BLOCK_HANDLER = 'on_block'
ENV_VARIABLE_REGEX = r'\${([\w]*):-(.*)}'
DEFAULT_RETRY_COUNT = 3
DEFAULT_RETRY_SLEEP = 1

sys.path.append(os.getcwd())
_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -140,6 +142,10 @@ class TzktDatasourceConfig(NameMixin):
kind: Literal['tzkt']
url: str

cache: Optional[bool] = None
retry_count: int = DEFAULT_RETRY_COUNT
retry_sleep: int = DEFAULT_RETRY_SLEEP

def __hash__(self):
return hash(self.url)

Expand All @@ -162,6 +168,10 @@ class BcdDatasourceConfig(NameMixin):
url: str
network: str

cache: Optional[bool] = None
retry_count: int = DEFAULT_RETRY_COUNT
retry_sleep: int = DEFAULT_RETRY_SLEEP

def __hash__(self):
return hash(self.url + self.network)

Expand All @@ -180,6 +190,10 @@ class CoinbaseDatasourceConfig(NameMixin):
secret_key: Optional[str] = None
passphrase: Optional[str] = None

cache: Optional[bool] = None
retry_count: int = DEFAULT_RETRY_COUNT
retry_sleep: int = DEFAULT_RETRY_SLEEP

def __hash__(self):
return hash(self.kind)

Expand Down
4 changes: 2 additions & 2 deletions src/dipdup/datasources/bcd/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@


class BcdDatasource:
def __init__(self, url: str, network: str, cache: bool) -> None:
def __init__(self, url: str, network: str, proxy=DatasourceRequestProxy()) -> None:
self._url = url.rstrip('/')
self._network = network
self._proxy = proxy
self._logger = logging.getLogger('dipdup.bcd')
self._proxy = DatasourceRequestProxy(cache)

async def close_session(self) -> None:
await self._proxy.close_session()
Expand Down
9 changes: 2 additions & 7 deletions src/dipdup/datasources/coinbase/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Tuple

from aiolimiter import AsyncLimiter

from dipdup.datasources.coinbase.models import CandleData, CandleInterval
from dipdup.datasources.proxy import DatasourceRequestProxy

Expand All @@ -13,12 +11,9 @@


class CoinbaseDatasource:
def __init__(self, cache: bool) -> None:
def __init__(self, proxy: DatasourceRequestProxy) -> None:
self._logger = logging.getLogger('dipdup.coinbase')
self._proxy = DatasourceRequestProxy(
cache=cache,
ratelimiter=AsyncLimiter(max_rate=10, time_period=1),
)
self._proxy = proxy

async def close_session(self) -> None:
await self._proxy.close_session()
Expand Down
39 changes: 28 additions & 11 deletions src/dipdup/datasources/proxy.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import hashlib
import logging
import pickle
Expand All @@ -7,16 +8,40 @@
from aiolimiter import AsyncLimiter
from fcache.cache import FileCache # type: ignore

from dipdup.config import DEFAULT_RETRY_COUNT, DEFAULT_RETRY_SLEEP # type: ignore
from dipdup.utils import http_request


class DatasourceRequestProxy:
def __init__(self, cache: bool = False, ratelimiter: Optional[AsyncLimiter] = None) -> None:
"""Wrapper for datasource HTTP requests.

Covers caching, retrying failed requests and ratelimiting"""

def __init__(
self,
cache: bool = False,
retry_count: int = DEFAULT_RETRY_COUNT,
retry_sleep: int = DEFAULT_RETRY_SLEEP,
ratelimiter: Optional[AsyncLimiter] = None,
) -> None:
self._logger = logging.getLogger(__name__)
self._cache = FileCache('dipdup', flag='cs') if cache else None
self._retry_count = retry_count
self._retry_sleep = retry_sleep
self._ratelimiter = ratelimiter
self._session = aiohttp.ClientSession()

async def _wrapped_request(self, method: str, **kwargs):
for attempt in range(self._retry_count):
self._logger.debug('Datasource request attempt %s/%s', attempt + 1, self._retry_count)
try:
return await http_request(self._session, method, **kwargs)
except (aiohttp.ClientConnectionError, aiohttp.ClientConnectorError) as e:
if attempt + 1 == self._retry_count:
raise e
self._logger.warning('Datasource request failed: %s', e)
await asyncio.sleep(self._retry_sleep)

async def http_request(self, method: str, skip_cache: bool = False, weight: int = 1, **kwargs):
if self._cache is not None and not skip_cache:
key = hashlib.sha256(pickle.dumps([method, kwargs])).hexdigest()
Expand All @@ -25,21 +50,13 @@ async def http_request(self, method: str, skip_cache: bool = False, weight: int
except KeyError:
if self._ratelimiter:
await self._ratelimiter.acquire(weight)
response = await http_request(
session=self._session,
method=method,
**kwargs,
)
response = await self._wrapped_request(method, **kwargs)
self._cache[key] = response
return response
else:
if self._ratelimiter:
await self._ratelimiter.acquire(weight)
response = await http_request(
session=self._session,
method=method,
**kwargs,
)
response = await self._wrapped_request(method, **kwargs)
return response

async def close_session(self) -> None:
Expand Down
8 changes: 6 additions & 2 deletions src/dipdup/datasources/tzkt/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,17 +263,21 @@ class TzktDatasource(IndexDatasource):
* Calls Matchers to match received operation groups with indexes' pattern and spawn callbacks on match
"""

def __init__(self, url: str, cache: bool) -> None:
def __init__(
self,
url: str,
proxy: DatasourceRequestProxy,
) -> None:
super().__init__()
self._url = url.rstrip('/')
self._proxy = proxy

self._logger = logging.getLogger('dipdup.tzkt')
self._transaction_subscriptions: Set[str] = set()
self._origination_subscriptions: bool = False
self._big_map_subscriptions: Dict[str, List[str]] = {}

self._client: Optional[BaseHubConnection] = None
self._proxy = DatasourceRequestProxy(cache)

self._level: Optional[int] = None
self._sync_level: Optional[int] = None
Expand Down
25 changes: 22 additions & 3 deletions src/dipdup/dipdup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from posix import listdir
from typing import Dict, List, cast

from aiolimiter import AsyncLimiter
from apscheduler.schedulers import SchedulerNotRunningError # type: ignore
from genericpath import exists
from tortoise import Tortoise
Expand Down Expand Up @@ -33,6 +34,7 @@
from dipdup.datasources.bcd.datasource import BcdDatasource
from dipdup.datasources.coinbase.datasource import CoinbaseDatasource
from dipdup.datasources.datasource import IndexDatasource
from dipdup.datasources.proxy import DatasourceRequestProxy
from dipdup.datasources.tzkt.datasource import TzktDatasource
from dipdup.exceptions import ConfigurationError
from dipdup.hasura import configure_hasura
Expand Down Expand Up @@ -228,20 +230,37 @@ async def _create_datasources(self) -> None:
if name in self._datasources:
continue

cache = self._config.cache_enabled if datasource_config.cache is None else datasource_config.cache
if isinstance(datasource_config, TzktDatasourceConfig):
proxy = DatasourceRequestProxy(
cache=cache,
retry_count=datasource_config.retry_count,
retry_sleep=datasource_config.retry_sleep,
)
datasource = TzktDatasource(
url=datasource_config.url,
cache=self._config.cache_enabled,
proxy=proxy,
)
elif isinstance(datasource_config, BcdDatasourceConfig):
proxy = DatasourceRequestProxy(
cache=cache,
retry_count=datasource_config.retry_count,
retry_sleep=datasource_config.retry_sleep,
)
datasource = BcdDatasource(
url=datasource_config.url,
network=datasource_config.network,
cache=self._config.cache_enabled,
proxy=proxy,
)
elif isinstance(datasource_config, CoinbaseDatasourceConfig):
proxy = DatasourceRequestProxy(
cache=cache,
retry_count=datasource_config.retry_count,
retry_sleep=datasource_config.retry_sleep,
ratelimiter=AsyncLimiter(max_rate=10, time_period=1),
)
datasource = CoinbaseDatasource(
cache=self._config.cache_enabled,
proxy=proxy,
)
else:
raise NotImplementedError
Expand Down