From ddaaf13893c74dc1338879a1d3d2fd62ba63c168 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Wed, 8 Sep 2021 02:15:43 +0300 Subject: [PATCH] Fix indexes in realtime status missing head relation --- src/dipdup/datasources/tzkt/datasource.py | 5 +++++ src/dipdup/http.py | 2 +- src/dipdup/index.py | 21 +++++++++++---------- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/src/dipdup/datasources/tzkt/datasource.py b/src/dipdup/datasources/tzkt/datasource.py index 200a8b209..585451132 100644 --- a/src/dipdup/datasources/tzkt/datasource.py +++ b/src/dipdup/datasources/tzkt/datasource.py @@ -359,6 +359,10 @@ def level(self) -> Optional[int]: def sync_level(self) -> Optional[int]: return self._sync_level + @property + def head(self) -> Optional[Head]: + return self._head + async def get_similar_contracts(self, address: str, strict: bool = False) -> List[str]: """Get list of contracts sharing the same code hash or type hash""" entrypoint = 'same' if strict else 'similar' @@ -716,6 +720,7 @@ async def _on_head_message(self, message: List[Dict[str, Any]]) -> None: created = False if self._head is None: self._head, created = await Head.get_or_create( + # NOTE: It would be better to use datasource name but it's not available name=self._http._url, defaults=dict( level=block.level, diff --git a/src/dipdup/http.py b/src/dipdup/http.py index b5637d63d..9a9ad6c68 100644 --- a/src/dipdup/http.py +++ b/src/dipdup/http.py @@ -72,7 +72,7 @@ async def __aenter__(self) -> None: """Create underlying aiohttp session""" self.__session = aiohttp.ClientSession( connector=aiohttp.TCPConnector(limit=self._config.connection_limit or 100), - timeout=aiohttp.ClientTimeout(self._config.connection_timeout or 60), + timeout=aiohttp.ClientTimeout(connect=self._config.connection_timeout or 60), ) async def __aexit__(self, exc_type, exc, tb): diff --git a/src/dipdup/index.py b/src/dipdup/index.py index 967214bc4..e235045b8 100644 --- a/src/dipdup/index.py +++ b/src/dipdup/index.py @@ -40,17 +40,12 @@ def __init__(self, ctx: DipDupContext, config: ResolvedIndexConfigT, datasource: self._datasource = datasource self._logger = FormattedLogger('dipdup.index', fmt=f'{config.name}: ' + '{}') - self._head: Optional[models.Head] = None self._state: Optional[models.Index] = None @property def datasource(self) -> TzktDatasource: return self._datasource - @property - def head(self) -> Optional[models.Head]: - return self._head - @property def state(self) -> models.Index: if self._state is None: @@ -124,6 +119,7 @@ async def _enter_sync_state(self, last_level: int) -> Optional[int]: async def _exit_sync_state(self, last_level: int) -> None: self._logger.info('Index is synchronized to level %s', last_level) + # NOTE: No head yet, wait for realtime messages to be processed await self.state.update_status(IndexStatus.REALTIME, last_level) @@ -231,9 +227,11 @@ async def _process_level_operations(self, level: int, operations: List[Operation self._logger.info('Processing %s operations of level %s', len(operations), level) await self._process_operations(operations) - status = IndexStatus.REALTIME if block else IndexStatus.SYNCING - # FIXME: Not obvious: receiving `BlockData`, sending `Head` - await self.state.update_status(status, level, self.head if block else None) + if block: + status, head = IndexStatus.REALTIME, self.datasource.head + else: + status, head = IndexStatus.SYNCING, None + await self.state.update_status(status, level, head) async def _match_operation(self, pattern_config: OperationHandlerPatternConfigT, operation: OperationData) -> bool: """Match single operation with pattern""" @@ -459,8 +457,11 @@ async def _process_level_big_maps(self, level: int, big_maps: List[BigMapData], self._logger.info('Processing %s big map diffs of level %s', len(big_maps), level) await self._process_big_maps(big_maps) - status = IndexStatus.REALTIME if block else IndexStatus.SYNCING - await self.state.update_status(status, level, self.datasource.block if block else None) + if block: + status, head = IndexStatus.REALTIME, self.datasource.head + else: + status, head = IndexStatus.SYNCING, None + await self.state.update_status(status, level, head) async def _match_big_map(self, handler_config: BigMapHandlerConfig, big_map: BigMapData) -> bool: """Match single big map diff with pattern"""