From a8c64ed76b1b04c3b0874ac96d8c322935073edd Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Fri, 1 Oct 2021 16:18:59 +0300 Subject: [PATCH 01/29] Track datasource level by channel, ignore head rollbacks --- src/dipdup/datasources/tzkt/datasource.py | 26 ++++++++++++----------- src/dipdup/dipdup.py | 20 ++++++++--------- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/dipdup/datasources/tzkt/datasource.py b/src/dipdup/datasources/tzkt/datasource.py index f0d54fba9..76a0b7de8 100644 --- a/src/dipdup/datasources/tzkt/datasource.py +++ b/src/dipdup/datasources/tzkt/datasource.py @@ -310,7 +310,7 @@ def __init__( self._big_map_subscriptions: Dict[str, Set[str]] = {} self._ws_client: Optional[BaseHubConnection] = None - self._level: Optional[int] = None + self._level: DefaultDict[MessageType, Optional[int]] = defaultdict(None) self._sync_level: Optional[int] = None @property @@ -656,26 +656,28 @@ async def _extract_message_data(self, type_: MessageType, message: List[Any]) -> # TODO: Docstring for item in message: tzkt_type = TzktMessageType(item['type']) - head_level = item['state'] + level, last_level = item['state'], self._level[type_] + self._level[type_] = level - self._logger.info('Realtime message received: %s, %s', type_, tzkt_type) + self._logger.info('Realtime message received: %s, %s, %s -> %s', type_.value, tzkt_type.name, last_level, level) # NOTE: State messages will be replaced with WS negotiation some day if tzkt_type == TzktMessageType.STATE: - if self._sync_level != head_level: - self._logger.info('Datasource level set to %s', head_level) - self._sync_level = head_level - self._level = head_level + if self._sync_level != level: + self._logger.info('Datasource sync level set to %s', level) + self._sync_level = level elif tzkt_type == TzktMessageType.DATA: - self._level = head_level yield item['data'] elif tzkt_type == TzktMessageType.REORG: - if self._level is None: - raise RuntimeError('Reorg message received but datasource is not connected') - self._logger.info('Emitting rollback from %s to %s', self._level, head_level) - await self.emit_rollback(self._level, head_level) + if last_level is None: + raise RuntimeError('Reorg message received but level is not set') + # NOTE: + if type_ == MessageType.head: + return + self._logger.info('Emitting rollback from %s to %s', last_level, level) + await self.emit_rollback(last_level, level) else: raise NotImplementedError diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index 1111208f7..5e436cf4f 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -5,7 +5,7 @@ from contextlib import AsyncExitStack, asynccontextmanager, suppress from functools import partial from operator import ne -from typing import Awaitable, Deque, Dict, List, Optional, Set +from typing import Awaitable, Deque, Dict, List, Optional, Set, cast from apscheduler.events import EVENT_JOB_ERROR # type: ignore from tortoise.exceptions import OperationalError @@ -169,18 +169,16 @@ async def _on_big_maps(self, datasource: TzktDatasource, big_maps: List[BigMapDa index.push(level, big_maps) async def _on_rollback(self, datasource: TzktDatasource, from_level: int, to_level: int) -> None: - # NOTE: Rollback could be received before head - if from_level - to_level in (0, 1): + if from_level - to_level == 1: # NOTE: Single level rollbacks are processed at Index level. # NOTE: Notify all indexes which use rolled back datasource to drop duplicated operations from the next block - for index in self._indexes.values(): - if index.datasource == datasource: - # NOTE: Continue to rollback with handler - if not isinstance(index, OperationIndex): - self._logger.info('Single level rollback is not supported by `%s` indexes', index._config.kind) - break - await index.single_level_rollback(from_level) - else: + indexes = iter(self._indexes.values()) + matching_indexes = filter(lambda index: index.datasource == datasource, indexes) + all_indexes_are_operation = all(isinstance(index, OperationIndex) for index in matching_indexes) + + if all_indexes_are_operation: + for index in matching_indexes: + await cast(OperationIndex, index).single_level_rollback(from_level) return await self._ctx.fire_hook('on_rollback', datasource=datasource, from_level=from_level, to_level=to_level) From 78f6e92b34477941788afb633f6b8e56c203a936 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Fri, 1 Oct 2021 16:20:27 +0300 Subject: [PATCH 02/29] Docs --- src/dipdup/datasources/tzkt/datasource.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/dipdup/datasources/tzkt/datasource.py b/src/dipdup/datasources/tzkt/datasource.py index 76a0b7de8..80c6bc355 100644 --- a/src/dipdup/datasources/tzkt/datasource.py +++ b/src/dipdup/datasources/tzkt/datasource.py @@ -673,9 +673,10 @@ async def _extract_message_data(self, type_: MessageType, message: List[Any]) -> elif tzkt_type == TzktMessageType.REORG: if last_level is None: raise RuntimeError('Reorg message received but level is not set') - # NOTE: + # NOTE: operation/big_map channels have their own levels if type_ == MessageType.head: return + self._logger.info('Emitting rollback from %s to %s', last_level, level) await self.emit_rollback(last_level, level) From 71f864859db73618763ecf4a60bd6ebeb275015a Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Fri, 1 Oct 2021 16:50:41 +0300 Subject: [PATCH 03/29] Typo --- src/dipdup/datasources/tzkt/datasource.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dipdup/datasources/tzkt/datasource.py b/src/dipdup/datasources/tzkt/datasource.py index 80c6bc355..b62c19491 100644 --- a/src/dipdup/datasources/tzkt/datasource.py +++ b/src/dipdup/datasources/tzkt/datasource.py @@ -310,7 +310,7 @@ def __init__( self._big_map_subscriptions: Dict[str, Set[str]] = {} self._ws_client: Optional[BaseHubConnection] = None - self._level: DefaultDict[MessageType, Optional[int]] = defaultdict(None) + self._level: DefaultDict[MessageType, Optional[int]] = defaultdict(lambda: None) self._sync_level: Optional[int] = None @property From f3cbc89185d6237e842a06ec1d9180fb6a6aed2e Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Fri, 1 Oct 2021 18:03:18 +0300 Subject: [PATCH 04/29] Changelog, fix level check --- CHANGELOG.md | 6 +++++- src/dipdup/index.py | 9 +++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f4e5f284d..c5d291be8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,11 +2,15 @@ ## [unreleased] +### Fixed + +* Fixed possible race condition during single level rollback. + ## 3.0.3 - 2021-10-01 ### Fixed -* Fixed processing of single level rollbacks emitted before rolled back head +* Fixed processing of single level rollbacks emitted before rolled back head. ## 3.0.2 - 2021-09-30 diff --git a/src/dipdup/index.py b/src/dipdup/index.py index 84a32765a..bb806870b 100644 --- a/src/dipdup/index.py +++ b/src/dipdup/index.py @@ -145,7 +145,7 @@ async def single_level_rollback(self, from_level: int) -> None: Called by IndexDispatcher in case index datasource reported a rollback. """ if self._rollback_level: - raise RuntimeError('Already in rollback state') + raise RuntimeError('Index is already in rollback state') if self.state.level < from_level: self._logger.info('Index level is lower than rollback level, ignoring') @@ -196,9 +196,6 @@ async def _synchronize(self, last_level: int, cache: bool = False) -> None: await self._exit_sync_state(last_level) async def _process_level_operations(self, level: int, operations: List[OperationData]) -> None: - if level <= self.state.level: - raise RuntimeError(f'Level of operation batch must be higher than index state level: {level} <= {self.state.level}') - if self._rollback_level: levels = { 'operations': level, @@ -223,6 +220,10 @@ async def _process_level_operations(self, level: int, operations: List[Operation return operations = [op for op in operations if op.hash in new_hashes] + # NOTE: le intended since it's not a single level rollback + elif level <= self.state.level: + raise RuntimeError(f'Level of operation batch must be higher than index state level: {level} <= {self.state.level}') + async with in_global_transaction(): self._logger.info('Processing %s operations of level %s', len(operations), level) await self._process_operations(operations) From d64eeba67a6a2574154dae9526a273d66d0e3487 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Fri, 1 Oct 2021 20:34:11 +0300 Subject: [PATCH 05/29] Rollback tests WIP --- src/dipdup/dipdup.py | 7 +- src/dipdup/index.py | 1 + tests/integration_tests/test_rollback.py | 100 +++++++---------------- 3 files changed, 33 insertions(+), 75 deletions(-) diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index 5e436cf4f..7841b6aa1 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -64,7 +64,7 @@ async def run( while self._tasks: tasks.append(self._tasks.popleft()) - async with slowdown(1.0): + async with slowdown(0.1): await gather(*tasks) indexes_spawned = False @@ -170,11 +170,12 @@ async def _on_big_maps(self, datasource: TzktDatasource, big_maps: List[BigMapDa async def _on_rollback(self, datasource: TzktDatasource, from_level: int, to_level: int) -> None: if from_level - to_level == 1: - # NOTE: Single level rollbacks are processed at Index level. + self._logger.info('Attempting a single level rollback') # NOTE: Notify all indexes which use rolled back datasource to drop duplicated operations from the next block indexes = iter(self._indexes.values()) - matching_indexes = filter(lambda index: index.datasource == datasource, indexes) + matching_indexes = tuple(filter(lambda index: index.datasource == datasource, indexes)) all_indexes_are_operation = all(isinstance(index, OperationIndex) for index in matching_indexes) + self._logger.info('Indexes: %s total, %s matching, all are `operation` is %s', len(self._indexes), len(matching_indexes), all_indexes_are_operation) if all_indexes_are_operation: for index in matching_indexes: diff --git a/src/dipdup/index.py b/src/dipdup/index.py index bb806870b..07cddcc9d 100644 --- a/src/dipdup/index.py +++ b/src/dipdup/index.py @@ -148,6 +148,7 @@ async def single_level_rollback(self, from_level: int) -> None: raise RuntimeError('Index is already in rollback state') if self.state.level < from_level: + print(self.state.level, from_level) self._logger.info('Index level is lower than rollback level, ignoring') elif self.state.level == from_level: self._logger.info('Single level rollback has been triggered') diff --git a/tests/integration_tests/test_rollback.py b/tests/integration_tests/test_rollback.py index a16ff7b4a..aafa9a42f 100644 --- a/tests/integration_tests/test_rollback.py +++ b/tests/integration_tests/test_rollback.py @@ -1,6 +1,7 @@ import asyncio from datetime import datetime from functools import partial +import logging from os.path import dirname, join from types import MethodType from unittest import IsolatedAsyncioTestCase, skip @@ -15,6 +16,8 @@ from dipdup.models import OperationData +logging.basicConfig(level=logging.INFO) + def _get_operation(hash_: str, level: int) -> OperationData: return OperationData( storage={}, @@ -36,22 +39,25 @@ def _get_operation(hash_: str, level: int) -> OperationData: # NOTE: Skip synchronization async def operation_index_process(self: OperationIndex): - await self.initialize_state() await self._process_queue() -# NOTE: Emit operations, rollback, emit again, check state -async def datasource_run(self: TzktDatasource, index_dispatcher: IndexDispatcher, fail=False): +old_block = MagicMock(spec=HeadBlockData) +old_block.hash = 'block_a' +old_block.level = 1365001 +old_block.timestamp = datetime(2018, 1, 1) +new_block = MagicMock(spec=HeadBlockData) +new_block.hash = 'block_b' +new_block.level = 1365001 +new_block.timestamp = datetime(2018, 1, 1) + - old_block = MagicMock(spec=HeadBlockData) - old_block.hash = 'block_a' - old_block.level = 1365001 - old_block.timestamp = datetime(2018, 1, 1) - new_block = MagicMock(spec=HeadBlockData) - new_block.hash = 'block_b' - new_block.level = 1365001 - new_block.timestamp = datetime(2018, 1, 1) +# _sleep = 0.2 +_sleep = 0 + +# NOTE: Emit operations, rollback, emit again, check state +async def datasource_run(self: TzktDatasource): await self.emit_operations( [ _get_operation('1', 1365001), @@ -59,91 +65,41 @@ async def datasource_run(self: TzktDatasource, index_dispatcher: IndexDispatcher _get_operation('3', 1365001), ], ) - await asyncio.sleep(0.05) + await asyncio.sleep(_sleep) await self.emit_rollback( from_level=1365001, to_level=1365000, ) - await asyncio.sleep(0.05) - self.emit_operations( + await asyncio.sleep(_sleep) + await self.emit_operations( [ _get_operation('1', 1365001), _get_operation('2', 1365001), + _get_operation('3', 1365001), ] - + ( - [ - _get_operation('3', 1365001), - ] - if not fail - else [] - ), ) - await asyncio.sleep(0.05) - index_dispatcher.stop() + await asyncio.sleep(_sleep) # Assert state = await State.filter(name='hen_mainnet').get() - assert state.level == 1365001 + assert state.level == 1365001, state.level + + raise asyncio.CancelledError -@skip('RuntimeError: Index is synchronized but has no head block data') class RollbackTest(IsolatedAsyncioTestCase): async def test_rollback_ok(self): # Arrange config = DipDupConfig.load([join(dirname(__file__), 'hic_et_nunc.yml')]) config.database.path = ':memory:' + config.initialize() - datasource_name, datasource_config = list(config.datasources.items())[0] - datasource = TzktDatasource('test') - dipdup = DipDup(config) - dipdup._datasources[datasource_name] = datasource - dipdup._datasources_by_config[datasource_config] = datasource - - initial_block = MagicMock(spec=BlockData) - initial_block.level = 0 - initial_block.hash = 'block_0' - - datasource.on_operations(dipdup._index_dispatcher._on_operations) - datasource.on_big_maps(dipdup._index_dispatcher._on_big_maps) - datasource.on_rollback(dipdup._index_dispatcher._on_rollback) - - datasource.run = MethodType(partial(datasource_run, index_dispatcher=dipdup._index_dispatcher), datasource) - datasource.get_block = AsyncMock(return_value=initial_block) - - # Act - with patch('dipdup.index.OperationIndex.process', operation_index_process): - with patch('dipdup.dipdup.INDEX_DISPATCHER_INTERVAL', 0.01): - await dipdup.run(False, False) - - async def test_rollback_fail(self): - # Arrange - config = DipDupConfig.load([join(dirname(__file__), 'hic_et_nunc.yml')]) - config.database.path = ':memory:' - - datasource_name, datasource_config = list(config.datasources.items())[0] - datasource = TzktDatasource('test') dipdup = DipDup(config) - dipdup._datasources[datasource_name] = datasource - dipdup._datasources_by_config[datasource_config] = datasource - dipdup._ctx.reindex = AsyncMock() - - initial_block = MagicMock(spec=BlockData) - initial_block.level = 0 - initial_block.hash = 'block_0' - - datasource.on_operations(dipdup._index_dispatcher._on_operations) - datasource.on_big_maps(dipdup._index_dispatcher._on_big_maps) - datasource.on_rollback(dipdup._index_dispatcher._on_rollback) - - datasource.run = MethodType(partial(datasource_run, index_dispatcher=dipdup._index_dispatcher, fail=True), datasource) - datasource.get_block = AsyncMock(return_value=initial_block) # Act with patch('dipdup.index.OperationIndex.process', operation_index_process): - with patch('dipdup.dipdup.INDEX_DISPATCHER_INTERVAL', 0.01): - await dipdup.run(False, False) - - dipdup._ctx.reindex.assert_awaited() + with patch('dipdup.datasources.tzkt.datasource.TzktDatasource.run', datasource_run): + await dipdup.run(False, False, False) From 096a51b63bfc350ad2d324b5c78b75c1beb627da Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sat, 2 Oct 2021 07:50:30 +0300 Subject: [PATCH 06/29] Single queue WIP --- poetry.lock | 13 ++++- pyproject.toml | 1 + src/dipdup/datasources/tzkt/datasource.py | 1 + src/dipdup/dipdup.py | 23 +++++--- src/dipdup/index.py | 68 +++++++++++++++-------- tests/integration_tests/test_rollback.py | 14 ++--- 6 files changed, 80 insertions(+), 40 deletions(-) diff --git a/poetry.lock b/poetry.lock index 46704fe51..f5d337501 100644 --- a/poetry.lock +++ b/poetry.lock @@ -917,6 +917,14 @@ win32-setctime = {version = ">=1.0.0", markers = "sys_platform == \"win32\""} [package.extras] dev = ["codecov (>=2.0.15)", "colorama (>=0.3.4)", "flake8 (>=3.7.7)", "tox (>=3.9.0)", "tox-travis (>=0.12)", "pytest (>=4.6.2)", "pytest-cov (>=2.7.1)", "Sphinx (>=2.2.1)", "sphinx-autobuild (>=0.7.1)", "sphinx-rtd-theme (>=0.4.3)", "black (>=19.10b0)", "isort (>=5.1.1)"] +[[package]] +name = "lru-dict" +version = "1.1.7" +description = "An Dict like LRU container." +category = "main" +optional = false +python-versions = "*" + [[package]] name = "markupsafe" version = "2.0.1" @@ -2063,7 +2071,7 @@ pytezos = ["pytezos"] [metadata] lock-version = "1.1" python-versions = "^3.8" -content-hash = "d52d40cdb12cc2a558a1b9ad841775e1406334232ad944963e31e4287ae1e7d3" +content-hash = "8ee705e13e2e482ae02d8177c6789696921d01f403159ac39e8e31aa61bf9048" [metadata.files] aiohttp = [ @@ -2513,6 +2521,9 @@ loguru = [ {file = "loguru-0.5.3-py3-none-any.whl", hash = "sha256:f8087ac396b5ee5f67c963b495d615ebbceac2796379599820e324419d53667c"}, {file = "loguru-0.5.3.tar.gz", hash = "sha256:b28e72ac7a98be3d28ad28570299a393dfcd32e5e3f6a353dec94675767b6319"}, ] +lru-dict = [ + {file = "lru-dict-1.1.7.tar.gz", hash = "sha256:45b81f67d75341d4433abade799a47e9c42a9e22a118531dcb5e549864032d7c"}, +] markupsafe = [ {file = "MarkupSafe-2.0.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:d8446c54dc28c01e5a2dbac5a25f071f6653e6e40f3a8818e8b45d790fe6ef53"}, {file = "MarkupSafe-2.0.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:36bc903cbb393720fad60fc28c10de6acf10dc6cc883f3e24ee4012371399a38"}, diff --git a/pyproject.toml b/pyproject.toml index c42a5e8bc..4afebbac6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,7 @@ pytezos = {version = "^3.2.4", optional = true} asyncclick = "^8.0.1" anyio = "^3.2.1" sqlparse = "^0.4.1" +lru-dict = "^1.1.7" [tool.poetry.dev-dependencies] black = "^20.8b1" diff --git a/src/dipdup/datasources/tzkt/datasource.py b/src/dipdup/datasources/tzkt/datasource.py index b62c19491..294568fe8 100644 --- a/src/dipdup/datasources/tzkt/datasource.py +++ b/src/dipdup/datasources/tzkt/datasource.py @@ -653,6 +653,7 @@ async def _subscribe_to_head(self) -> None: ) async def _extract_message_data(self, type_: MessageType, message: List[Any]) -> AsyncGenerator[Dict, None]: + """""" # TODO: Docstring for item in message: tzkt_type = TzktMessageType(item['type']) diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index 7841b6aa1..891d98032 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -5,7 +5,7 @@ from contextlib import AsyncExitStack, asynccontextmanager, suppress from functools import partial from operator import ne -from typing import Awaitable, Deque, Dict, List, Optional, Set, cast +from typing import Awaitable, Deque, Dict, List, Optional, Set, Tuple from apscheduler.events import EVENT_JOB_ERROR # type: ignore from tortoise.exceptions import OperationalError @@ -31,7 +31,7 @@ from dipdup.enums import ReindexingReason from dipdup.exceptions import ConfigInitializationException, DipDupException from dipdup.hasura import HasuraGateway -from dipdup.index import BigMapIndex, Index, OperationIndex +from dipdup.index import BigMapIndex, Index, OperationIndex, block_cache from dipdup.models import BigMapData, Contract, Head, HeadBlockData from dipdup.models import Index as IndexState from dipdup.models import IndexStatus, OperationData, Schema @@ -139,6 +139,8 @@ async def _load_index_states(self) -> None: else: self._logger.warning('Index `%s` was removed from config, ignoring', name) + block_cache.clear() + async def _on_head(self, datasource: TzktDatasource, head: HeadBlockData) -> None: # NOTE: Do not await query results - blocked database connection may cause Websocket timeout. self._tasks.append( @@ -155,31 +157,38 @@ async def _on_head(self, datasource: TzktDatasource, head: HeadBlockData) -> Non ) async def _on_operations(self, datasource: TzktDatasource, operations: List[OperationData]) -> None: + # FIXME: set assert len(set(op.level for op in operations)) == 1 level = operations[0].level for index in self._indexes.values(): if isinstance(index, OperationIndex) and index.datasource == datasource: - index.push(level, operations) + index.push_operations(operations) async def _on_big_maps(self, datasource: TzktDatasource, big_maps: List[BigMapData]) -> None: + # FIXME: set assert len(set(op.level for op in big_maps)) == 1 level = big_maps[0].level for index in self._indexes.values(): if isinstance(index, BigMapIndex) and index.datasource == datasource: - index.push(level, big_maps) + index.push_big_maps(level, big_maps) async def _on_rollback(self, datasource: TzktDatasource, from_level: int, to_level: int) -> None: if from_level - to_level == 1: self._logger.info('Attempting a single level rollback') # NOTE: Notify all indexes which use rolled back datasource to drop duplicated operations from the next block indexes = iter(self._indexes.values()) - matching_indexes = tuple(filter(lambda index: index.datasource == datasource, indexes)) + matching_indexes: Tuple[OperationIndex] = tuple(filter(lambda index: index.datasource == datasource, indexes)) all_indexes_are_operation = all(isinstance(index, OperationIndex) for index in matching_indexes) - self._logger.info('Indexes: %s total, %s matching, all are `operation` is %s', len(self._indexes), len(matching_indexes), all_indexes_are_operation) + self._logger.info( + 'Indexes: %s total, %s matching, all are `operation` is %s', + len(self._indexes), + len(matching_indexes), + all_indexes_are_operation, + ) if all_indexes_are_operation: for index in matching_indexes: - await cast(OperationIndex, index).single_level_rollback(from_level) + await index.single_level_rollback(from_level) return await self._ctx.fire_hook('on_rollback', datasource=datasource, from_level=from_level, to_level=to_level) diff --git a/src/dipdup/index.py b/src/dipdup/index.py index 07cddcc9d..4b7eccfe5 100644 --- a/src/dipdup/index.py +++ b/src/dipdup/index.py @@ -2,6 +2,7 @@ from collections import defaultdict, deque, namedtuple from typing import Deque, Dict, List, Optional, Set, Tuple, Union, cast +from lru import LRU from pydantic.error_wrappers import ValidationError import dipdup.models as models @@ -27,7 +28,13 @@ # NOTE: Operations of a single contract call OperationSubgroup = namedtuple('OperationSubgroup', ('hash', 'counter')) -_cached_blocks: Dict[int, BlockData] = {} +# NOTE: Message queue of OperationIndex +SingleLevelRollback = namedtuple('SingleLevelRollback', ('level')) +Operations = List[OperationData] +OperationQueueItemT = Union[Operations, SingleLevelRollback] + +# NOTE: For initializing the index state on startup +block_cache: Dict[int, BlockData] = LRU(10) class Index: @@ -73,9 +80,9 @@ async def initialize_state(self) -> None: if not head: return - if head.level not in _cached_blocks: - _cached_blocks[head.level] = await self.datasource.get_block(head.level) - if head.hash != _cached_blocks[head.level].hash: + if head.level not in block_cache: + block_cache[head.level] = await self.datasource.get_block(head.level) + if head.hash != block_cache[head.level].hash: await self._ctx.reindex(ReindexingReason.BLOCK_HASH_MISMATCH) async def process(self) -> None: @@ -130,29 +137,32 @@ class OperationIndex(Index): def __init__(self, ctx: DipDupContext, config: OperationIndexConfig, datasource: TzktDatasource) -> None: super().__init__(ctx, config, datasource) - self._queue: Deque[Tuple[int, List[OperationData]]] = deque() + self._queue: Deque[OperationQueueItemT] = deque() self._contract_hashes: Dict[str, Tuple[int, int]] = {} self._rollback_level: Optional[int] = None - self._last_hashes: Set[str] = set() + self._head_hashes: Set[str] = set() self._migration_originations: Optional[Dict[str, OperationData]] = None - def push(self, level: int, operations: List[OperationData]) -> None: - self._queue.append((level, operations)) + def push_operations(self, operations: List[OperationData]) -> None: + self._queue.append(operations) + + def push_rollback(self, level: int) -> None: + self._queue.append(SingleLevelRollback(level)) - async def single_level_rollback(self, from_level: int) -> None: - """Ensure next arrived block is the same as rolled back one + async def single_level_rollback(self, level: int) -> None: + """Ensure next arrived block has all operations of the previous block. But it could also contain additional operations. - Called by IndexDispatcher in case index datasource reported a rollback. + Called by IndexDispatcher when index datasource receive a single level rollback. """ if self._rollback_level: raise RuntimeError('Index is already in rollback state') - if self.state.level < from_level: - print(self.state.level, from_level) + state_level = cast(int, self.state.level) + if state_level < level: self._logger.info('Index level is lower than rollback level, ignoring') - elif self.state.level == from_level: + elif state_level == level: self._logger.info('Single level rollback has been triggered') - self._rollback_level = from_level + self._rollback_level = level else: raise RuntimeError('Index level is higher than rollback level') @@ -161,8 +171,13 @@ async def _process_queue(self) -> None: if self._queue: self._logger.info('Processing websocket queue') while self._queue: - level, operations = self._queue.popleft() - await self._process_level_operations(level, operations) + message = self._queue.popleft() + if isinstance(message, SingleLevelRollback): + await self.single_level_rollback(message.level) + elif isinstance(message, list): + await self._process_operations(message) + else: + raise RuntimeError async def _synchronize(self, last_level: int, cache: bool = False) -> None: """Fetch operations via Fetcher and pass to message callback""" @@ -191,12 +206,17 @@ async def _synchronize(self, last_level: int, cache: bool = False) -> None: migration_originations=migration_originations, ) - async for level, operations in fetcher.fetch_operations_by_level(): - await self._process_level_operations(level, operations) + async for _, operations in fetcher.fetch_operations_by_level(): + await self._process_level_operations(operations) await self._exit_sync_state(last_level) - async def _process_level_operations(self, level: int, operations: List[OperationData]) -> None: + async def _process_level_operations(self, operations: List[OperationData]) -> None: + batch_levels = tuple(set(operation.level for operation in operations)) + if len(batch_levels) != 1: + raise RuntimeError(f'Operations in batch have different levels: {batch_levels}') + level = tuple(batch_levels)[0] + if self._rollback_level: levels = { 'operations': level, @@ -208,14 +228,14 @@ async def _process_level_operations(self, level: int, operations: List[Operation raise RuntimeError(f'Index is in a rollback state, but received operation batch with different levels: {levels_repr}') self._logger.info('Rolling back to previous level, verifying processed operations') - expected_hashes = set(self._last_hashes) + expected_hashes = set(self._head_hashes) received_hashes = set([op.hash for op in operations]) reused_hashes = received_hashes & expected_hashes if reused_hashes != expected_hashes: await self._ctx.reindex(ReindexingReason.ROLLBACK) self._rollback_level = None - self._last_hashes = set() + self._head_hashes = set() new_hashes = received_hashes - expected_hashes if not new_hashes: return @@ -265,12 +285,12 @@ async def _match_operation(self, pattern_config: OperationHandlerPatternConfigT, async def _process_operations(self, operations: List[OperationData]) -> None: """Try to match operations in cache with all patterns from indexes. Must be wrapped in transaction.""" - self._last_hashes = set() + self._head_hashes = set() operation_subgroups: Dict[OperationSubgroup, List[OperationData]] = defaultdict(list) for operation in operations: key = OperationSubgroup(operation.hash, operation.counter) operation_subgroups[key].append(operation) - self._last_hashes.add(operation.hash) + self._head_hashes.add(operation.hash) for operation_subgroup, operations in operation_subgroups.items(): self._logger.debug('Matching %s', key) diff --git a/tests/integration_tests/test_rollback.py b/tests/integration_tests/test_rollback.py index aafa9a42f..0b1b4d041 100644 --- a/tests/integration_tests/test_rollback.py +++ b/tests/integration_tests/test_rollback.py @@ -1,23 +1,21 @@ import asyncio -from datetime import datetime -from functools import partial import logging +from datetime import datetime from os.path import dirname, join -from types import MethodType -from unittest import IsolatedAsyncioTestCase, skip -from unittest.mock import AsyncMock, MagicMock, patch +from unittest import IsolatedAsyncioTestCase +from unittest.mock import MagicMock, patch from dipdup.config import DipDupConfig from dipdup.datasources.tzkt.datasource import TzktDatasource -from dipdup.dipdup import DipDup, IndexDispatcher +from dipdup.dipdup import DipDup from dipdup.index import OperationIndex -from dipdup.models import BlockData, HeadBlockData +from dipdup.models import HeadBlockData from dipdup.models import Index as State from dipdup.models import OperationData - logging.basicConfig(level=logging.INFO) + def _get_operation(hash_: str, level: int) -> OperationData: return OperationData( storage={}, From 4e075214e97c0345057422f818bfa91d610e9b41 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sat, 2 Oct 2021 08:31:54 +0300 Subject: [PATCH 07/29] Refactor OperationIndex --- poetry.lock | 13 +------------ pyproject.toml | 1 - src/dipdup/dipdup.py | 10 ++-------- src/dipdup/index.py | 28 +++++++++++++++++----------- 4 files changed, 20 insertions(+), 32 deletions(-) diff --git a/poetry.lock b/poetry.lock index f5d337501..46704fe51 100644 --- a/poetry.lock +++ b/poetry.lock @@ -917,14 +917,6 @@ win32-setctime = {version = ">=1.0.0", markers = "sys_platform == \"win32\""} [package.extras] dev = ["codecov (>=2.0.15)", "colorama (>=0.3.4)", "flake8 (>=3.7.7)", "tox (>=3.9.0)", "tox-travis (>=0.12)", "pytest (>=4.6.2)", "pytest-cov (>=2.7.1)", "Sphinx (>=2.2.1)", "sphinx-autobuild (>=0.7.1)", "sphinx-rtd-theme (>=0.4.3)", "black (>=19.10b0)", "isort (>=5.1.1)"] -[[package]] -name = "lru-dict" -version = "1.1.7" -description = "An Dict like LRU container." -category = "main" -optional = false -python-versions = "*" - [[package]] name = "markupsafe" version = "2.0.1" @@ -2071,7 +2063,7 @@ pytezos = ["pytezos"] [metadata] lock-version = "1.1" python-versions = "^3.8" -content-hash = "8ee705e13e2e482ae02d8177c6789696921d01f403159ac39e8e31aa61bf9048" +content-hash = "d52d40cdb12cc2a558a1b9ad841775e1406334232ad944963e31e4287ae1e7d3" [metadata.files] aiohttp = [ @@ -2521,9 +2513,6 @@ loguru = [ {file = "loguru-0.5.3-py3-none-any.whl", hash = "sha256:f8087ac396b5ee5f67c963b495d615ebbceac2796379599820e324419d53667c"}, {file = "loguru-0.5.3.tar.gz", hash = "sha256:b28e72ac7a98be3d28ad28570299a393dfcd32e5e3f6a353dec94675767b6319"}, ] -lru-dict = [ - {file = "lru-dict-1.1.7.tar.gz", hash = "sha256:45b81f67d75341d4433abade799a47e9c42a9e22a118531dcb5e549864032d7c"}, -] markupsafe = [ {file = "MarkupSafe-2.0.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:d8446c54dc28c01e5a2dbac5a25f071f6653e6e40f3a8818e8b45d790fe6ef53"}, {file = "MarkupSafe-2.0.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:36bc903cbb393720fad60fc28c10de6acf10dc6cc883f3e24ee4012371399a38"}, diff --git a/pyproject.toml b/pyproject.toml index 4afebbac6..c42a5e8bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,7 +40,6 @@ pytezos = {version = "^3.2.4", optional = true} asyncclick = "^8.0.1" anyio = "^3.2.1" sqlparse = "^0.4.1" -lru-dict = "^1.1.7" [tool.poetry.dev-dependencies] black = "^20.8b1" diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index 891d98032..b8ff79b1d 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -157,27 +157,21 @@ async def _on_head(self, datasource: TzktDatasource, head: HeadBlockData) -> Non ) async def _on_operations(self, datasource: TzktDatasource, operations: List[OperationData]) -> None: - # FIXME: set - assert len(set(op.level for op in operations)) == 1 - level = operations[0].level for index in self._indexes.values(): if isinstance(index, OperationIndex) and index.datasource == datasource: index.push_operations(operations) async def _on_big_maps(self, datasource: TzktDatasource, big_maps: List[BigMapData]) -> None: - # FIXME: set - assert len(set(op.level for op in big_maps)) == 1 - level = big_maps[0].level for index in self._indexes.values(): if isinstance(index, BigMapIndex) and index.datasource == datasource: - index.push_big_maps(level, big_maps) + index.push_big_maps(big_maps) async def _on_rollback(self, datasource: TzktDatasource, from_level: int, to_level: int) -> None: if from_level - to_level == 1: self._logger.info('Attempting a single level rollback') # NOTE: Notify all indexes which use rolled back datasource to drop duplicated operations from the next block indexes = iter(self._indexes.values()) - matching_indexes: Tuple[OperationIndex] = tuple(filter(lambda index: index.datasource == datasource, indexes)) + matching_indexes: Tuple[OperationIndex] = tuple(filter(lambda index: index.datasource == datasource, indexes)) # type: ignore all_indexes_are_operation = all(isinstance(index, OperationIndex) for index in matching_indexes) self._logger.info( 'Indexes: %s total, %s matching, all are `operation` is %s', diff --git a/src/dipdup/index.py b/src/dipdup/index.py index 4b7eccfe5..830cc0235 100644 --- a/src/dipdup/index.py +++ b/src/dipdup/index.py @@ -2,7 +2,6 @@ from collections import defaultdict, deque, namedtuple from typing import Deque, Dict, List, Optional, Set, Tuple, Union, cast -from lru import LRU from pydantic.error_wrappers import ValidationError import dipdup.models as models @@ -34,7 +33,7 @@ OperationQueueItemT = Union[Operations, SingleLevelRollback] # NOTE: For initializing the index state on startup -block_cache: Dict[int, BlockData] = LRU(10) +block_cache: Dict[int, BlockData] = {} class Index: @@ -159,7 +158,7 @@ async def single_level_rollback(self, level: int) -> None: state_level = cast(int, self.state.level) if state_level < level: - self._logger.info('Index level is lower than rollback level, ignoring') + self._logger.info('Index level is lower than rollback level, ignoring: %s < %s', state_level, level) elif state_level == level: self._logger.info('Single level rollback has been triggered') self._rollback_level = level @@ -212,6 +211,7 @@ async def _synchronize(self, last_level: int, cache: bool = False) -> None: await self._exit_sync_state(last_level) async def _process_level_operations(self, operations: List[OperationData]) -> None: + # TODO: extract_level batch_levels = tuple(set(operation.level for operation in operations)) if len(batch_levels) != 1: raise RuntimeError(f'Operations in batch have different levels: {batch_levels}') @@ -427,18 +427,18 @@ class BigMapIndex(Index): def __init__(self, ctx: DipDupContext, config: BigMapIndexConfig, datasource: TzktDatasource) -> None: super().__init__(ctx, config, datasource) - self._queue: Deque[Tuple[int, List[BigMapData]]] = deque() + self._queue: Deque[List[BigMapData]] = deque() - def push(self, level: int, big_maps: List[BigMapData]): - self._queue.append((level, big_maps)) + def push_big_maps(self, big_maps: List[BigMapData]): + self._queue.append(big_maps) async def _process_queue(self) -> None: """Process WebSocket queue""" if self._queue: self._logger.info('Processing websocket queue') while self._queue: - level, big_maps = self._queue.popleft() - await self._process_level_big_maps(level, big_maps) + big_maps = self._queue.popleft() + await self._process_level_big_maps(big_maps) async def _synchronize(self, last_level: int, cache: bool = False) -> None: """Fetch operations via Fetcher and pass to message callback""" @@ -460,12 +460,18 @@ async def _synchronize(self, last_level: int, cache: bool = False) -> None: cache=cache, ) - async for level, big_maps in fetcher.fetch_big_maps_by_level(): - await self._process_level_big_maps(level, big_maps) + async for _, big_maps in fetcher.fetch_big_maps_by_level(): + await self._process_level_big_maps(big_maps) await self._exit_sync_state(last_level) - async def _process_level_big_maps(self, level: int, big_maps: List[BigMapData]): + async def _process_level_big_maps(self, big_maps: List[BigMapData]): + # TODO: extract_level + batch_levels = tuple(set(big_map.level for big_map in big_maps)) + if len(batch_levels) != 1: + raise RuntimeError(f'Operations in batch have different levels: {batch_levels}') + level = tuple(batch_levels)[0] + if level <= self.state.level: raise RuntimeError(f'Level of big map batch must be higher than index state level: {level} <= {self.state.level}') From e881efa5c230cea4d12d2839f3491ed54ec9c6ac Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sat, 2 Oct 2021 09:21:13 +0300 Subject: [PATCH 08/29] Fix tests --- src/dipdup/cli.py | 1 + src/dipdup/index.py | 2 +- tests/integration_tests/test_rollback.py | 28 +++++------------------- 3 files changed, 7 insertions(+), 24 deletions(-) diff --git a/src/dipdup/cli.py b/src/dipdup/cli.py index ff2645b34..c45ec73e7 100644 --- a/src/dipdup/cli.py +++ b/src/dipdup/cli.py @@ -113,6 +113,7 @@ async def cli(ctx, config: List[str], env_file: List[str], logging_config: str): _config = DipDupConfig.load(config) init_sentry(_config) + # TODO: Raise InitRequiredError on failure await DipDupCodeGenerator(_config, {}).create_package() if _config.spec_version not in spec_version_mapping: diff --git a/src/dipdup/index.py b/src/dipdup/index.py index 830cc0235..699d79799 100644 --- a/src/dipdup/index.py +++ b/src/dipdup/index.py @@ -163,7 +163,7 @@ async def single_level_rollback(self, level: int) -> None: self._logger.info('Single level rollback has been triggered') self._rollback_level = level else: - raise RuntimeError('Index level is higher than rollback level') + raise RuntimeError(f'Index level is higher than rollback level: {state_level} > {level}') async def _process_queue(self) -> None: """Process WebSocket queue""" diff --git a/tests/integration_tests/test_rollback.py b/tests/integration_tests/test_rollback.py index 0b1b4d041..88e4d13e0 100644 --- a/tests/integration_tests/test_rollback.py +++ b/tests/integration_tests/test_rollback.py @@ -1,5 +1,4 @@ import asyncio -import logging from datetime import datetime from os.path import dirname, join from unittest import IsolatedAsyncioTestCase @@ -8,13 +7,10 @@ from dipdup.config import DipDupConfig from dipdup.datasources.tzkt.datasource import TzktDatasource from dipdup.dipdup import DipDup -from dipdup.index import OperationIndex from dipdup.models import HeadBlockData from dipdup.models import Index as State from dipdup.models import OperationData -logging.basicConfig(level=logging.INFO) - def _get_operation(hash_: str, level: int) -> OperationData: return OperationData( @@ -35,11 +31,7 @@ def _get_operation(hash_: str, level: int) -> OperationData: ) -# NOTE: Skip synchronization -async def operation_index_process(self: OperationIndex): - await self._process_queue() - - +# NOTE: Same level, different hash old_block = MagicMock(spec=HeadBlockData) old_block.hash = 'block_a' old_block.level = 1365001 @@ -50,10 +42,6 @@ async def operation_index_process(self: OperationIndex): new_block.timestamp = datetime(2018, 1, 1) -# _sleep = 0.2 -_sleep = 0 - - # NOTE: Emit operations, rollback, emit again, check state async def datasource_run(self: TzktDatasource): await self.emit_operations( @@ -63,14 +51,10 @@ async def datasource_run(self: TzktDatasource): _get_operation('3', 1365001), ], ) - await asyncio.sleep(_sleep) - await self.emit_rollback( from_level=1365001, to_level=1365000, ) - - await asyncio.sleep(_sleep) await self.emit_operations( [ _get_operation('1', 1365001), @@ -79,9 +63,7 @@ async def datasource_run(self: TzktDatasource): ] ) - await asyncio.sleep(_sleep) - - # Assert + # NOTE: Assert while Tortoise is still running state = await State.filter(name='hen_mainnet').get() assert state.level == 1365001, state.level @@ -93,11 +75,11 @@ async def test_rollback_ok(self): # Arrange config = DipDupConfig.load([join(dirname(__file__), 'hic_et_nunc.yml')]) config.database.path = ':memory:' + config.indexes['hen_mainnet'].last_level = config.indexes['hen_mainnet'].first_level + 1 config.initialize() dipdup = DipDup(config) # Act - with patch('dipdup.index.OperationIndex.process', operation_index_process): - with patch('dipdup.datasources.tzkt.datasource.TzktDatasource.run', datasource_run): - await dipdup.run(False, False, False) + with patch('dipdup.datasources.tzkt.datasource.TzktDatasource.run', datasource_run): + await dipdup.run(False, False, False) From 169ca3fcfc1b2d38a062927288de7bfdf3ff78c1 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sat, 2 Oct 2021 15:58:13 +0300 Subject: [PATCH 09/29] Refactoring --- src/dipdup/datasources/tzkt/datasource.py | 74 ++++++----------------- src/dipdup/datasources/tzkt/enums.py | 45 ++++++++++++++ 2 files changed, 63 insertions(+), 56 deletions(-) diff --git a/src/dipdup/datasources/tzkt/datasource.py b/src/dipdup/datasources/tzkt/datasource.py index 294568fe8..84066a0be 100644 --- a/src/dipdup/datasources/tzkt/datasource.py +++ b/src/dipdup/datasources/tzkt/datasource.py @@ -3,7 +3,6 @@ from collections import defaultdict from datetime import datetime, timezone from decimal import Decimal -from enum import Enum from typing import Any, AsyncGenerator, DefaultDict, Dict, List, NoReturn, Optional, Set, Tuple, cast from aiohttp import ClientResponseError @@ -21,55 +20,18 @@ ResolvedIndexConfigT, ) from dipdup.datasources.datasource import IndexDatasource -from dipdup.datasources.tzkt.enums import TzktMessageType +from dipdup.datasources.tzkt.enums import ( + ORIGINATION_MIGRATION_FIELDS, + ORIGINATION_OPERATION_FIELDS, + TRANSACTION_OPERATION_FIELDS, + OperationFetcherRequest, + TzktMessageType, +) from dipdup.enums import MessageType from dipdup.models import BigMapAction, BigMapData, BlockData, HeadBlockData, OperationData, QuoteData from dipdup.utils import groupby, split_by_chunks TZKT_ORIGINATIONS_REQUEST_LIMIT = 100 -OPERATION_FIELDS = ( - "type", - "id", - "level", - "timestamp", - "hash", - "counter", - "sender", - "nonce", - "target", - "initiator", - "amount", - "storage", - "status", - "hasInternals", - "diffs", -) -ORIGINATION_MIGRATION_FIELDS = ( - "id", - "level", - "timestamp", - "storage", - "diffs", - "account", - "balanceChange", -) -ORIGINATION_OPERATION_FIELDS = ( - *OPERATION_FIELDS, - "originatedContract", -) -TRANSACTION_OPERATION_FIELDS = ( - *OPERATION_FIELDS, - "parameter", - "hasInternals", -) - - -class OperationFetcherChannel(Enum): - """Represents multiple TzKT calls to be merged into a single batch of operations""" - - sender_transactions = 'sender_transactions' - target_transactions = 'target_transactions' - originations = 'originations' def dedup_operations(operations: List[OperationData]) -> List[OperationData]: @@ -104,9 +66,9 @@ def __init__( self._logger = logging.getLogger('dipdup.tzkt') self._head: int = 0 - self._heads: Dict[OperationFetcherChannel, int] = {} - self._offsets: Dict[OperationFetcherChannel, int] = {} - self._fetched: Dict[OperationFetcherChannel, bool] = {} + self._heads: Dict[OperationFetcherRequest, int] = {} + self._offsets: Dict[OperationFetcherRequest, int] = {} + self._fetched: Dict[OperationFetcherRequest, bool] = {} self._operations: DefaultDict[int, List[OperationData]] if migration_originations: @@ -123,7 +85,7 @@ def _get_operations_head(self, operations: List[OperationData]) -> int: async def _fetch_originations(self) -> None: """Fetch a single batch of originations, bump channel offset""" - key = OperationFetcherChannel.originations + key = OperationFetcherRequest.originations if not self._origination_addresses: self._fetched[key] = True self._heads[key] = self._last_level @@ -157,7 +119,7 @@ async def _fetch_originations(self) -> None: async def _fetch_transactions(self, field: str) -> None: """Fetch a single batch of transactions, bump channel offset""" - key = getattr(OperationFetcherChannel, field + '_transactions') + key = getattr(OperationFetcherRequest, field + '_transactions') if not self._transaction_addresses: self._fetched[key] = True self._heads[key] = self._last_level @@ -193,9 +155,9 @@ async def _fetch_transactions(self, field: str) -> None: async def fetch_operations_by_level(self) -> AsyncGenerator[Tuple[int, List[OperationData]], None]: """Iterate by operations from multiple channels. Return is splitted by level, deduped/sorted and ready to be passeed to Matcher.""" for type_ in ( - OperationFetcherChannel.sender_transactions, - OperationFetcherChannel.target_transactions, - OperationFetcherChannel.originations, + OperationFetcherRequest.sender_transactions, + OperationFetcherRequest.target_transactions, + OperationFetcherRequest.originations, ): self._heads[type_] = 0 self._offsets[type_] = 0 @@ -203,11 +165,11 @@ async def fetch_operations_by_level(self) -> AsyncGenerator[Tuple[int, List[Oper while True: min_head = sorted(self._heads.items(), key=lambda x: x[1])[0][0] - if min_head == OperationFetcherChannel.originations: + if min_head == OperationFetcherRequest.originations: await self._fetch_originations() - elif min_head == OperationFetcherChannel.target_transactions: + elif min_head == OperationFetcherRequest.target_transactions: await self._fetch_transactions('target') - elif min_head == OperationFetcherChannel.sender_transactions: + elif min_head == OperationFetcherRequest.sender_transactions: await self._fetch_transactions('sender') else: raise RuntimeError diff --git a/src/dipdup/datasources/tzkt/enums.py b/src/dipdup/datasources/tzkt/enums.py index 836f9d82b..cee21abed 100644 --- a/src/dipdup/datasources/tzkt/enums.py +++ b/src/dipdup/datasources/tzkt/enums.py @@ -5,3 +5,48 @@ class TzktMessageType(Enum): STATE = 0 DATA = 1 REORG = 2 + + +OPERATION_FIELDS = ( + "type", + "id", + "level", + "timestamp", + "hash", + "counter", + "sender", + "nonce", + "target", + "initiator", + "amount", + "storage", + "status", + "hasInternals", + "diffs", +) +ORIGINATION_MIGRATION_FIELDS = ( + "id", + "level", + "timestamp", + "storage", + "diffs", + "account", + "balanceChange", +) +ORIGINATION_OPERATION_FIELDS = ( + *OPERATION_FIELDS, + "originatedContract", +) +TRANSACTION_OPERATION_FIELDS = ( + *OPERATION_FIELDS, + "parameter", + "hasInternals", +) + + +class OperationFetcherRequest(Enum): + """Represents multiple TzKT calls to be merged into a single batch of operations""" + + sender_transactions = 'sender_transactions' + target_transactions = 'target_transactions' + originations = 'originations' From 4cbba24d8667f48dd622f4f005b73f5f7eea0c25 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sat, 2 Oct 2021 16:17:34 +0300 Subject: [PATCH 10/29] Refactor `_extract_message_data` method --- src/dipdup/datasources/tzkt/datasource.py | 25 ++++++++++++++--------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/src/dipdup/datasources/tzkt/datasource.py b/src/dipdup/datasources/tzkt/datasource.py index 84066a0be..8f7f8b70c 100644 --- a/src/dipdup/datasources/tzkt/datasource.py +++ b/src/dipdup/datasources/tzkt/datasource.py @@ -615,33 +615,38 @@ async def _subscribe_to_head(self) -> None: ) async def _extract_message_data(self, type_: MessageType, message: List[Any]) -> AsyncGenerator[Dict, None]: - """""" - # TODO: Docstring + """Parse message received from Websocket, ensure it's correct in the current context and yield data.""" for item in message: tzkt_type = TzktMessageType(item['type']) - level, last_level = item['state'], self._level[type_] + level, current_level = item['state'], self._level[type_] self._level[type_] = level - self._logger.info('Realtime message received: %s, %s, %s -> %s', type_.value, tzkt_type.name, last_level, level) + self._logger.info('Realtime message received: %s, %s, %s -> %s', type_.value, tzkt_type.name, current_level, level) - # NOTE: State messages will be replaced with WS negotiation some day + # NOTE: Ensure correctness, update sync level if tzkt_type == TzktMessageType.STATE: - if self._sync_level != level: - self._logger.info('Datasource sync level set to %s', level) + if self._sync_level < level: + self._logger.info('Datasource sync level has been updated: %s -> %s', self._sync_level, level) self._sync_level = level + elif self._sync_level > level: + raise RuntimeError('Attempt to set sync level to the lower value: %s -> %s', self._sync_level, level) + else: + pass + # NOTE: Just yield data elif tzkt_type == TzktMessageType.DATA: yield item['data'] + # NOTE: Emit rollback, but not on `head` message elif tzkt_type == TzktMessageType.REORG: - if last_level is None: + if current_level is None: raise RuntimeError('Reorg message received but level is not set') # NOTE: operation/big_map channels have their own levels if type_ == MessageType.head: return - self._logger.info('Emitting rollback from %s to %s', last_level, level) - await self.emit_rollback(last_level, level) + self._logger.info('Emitting rollback from %s to %s', current_level, level) + await self.emit_rollback(current_level, level) else: raise NotImplementedError From 1c6acaebb4f5b4ec4ace4c3bd5aaf9b72eb23a42 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sat, 2 Oct 2021 16:37:30 +0300 Subject: [PATCH 11/29] Allow "zero level" rollbacks, refactor `_on_rollback` callback --- src/dipdup/dipdup.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index b8ff79b1d..d35ba645e 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -5,7 +5,7 @@ from contextlib import AsyncExitStack, asynccontextmanager, suppress from functools import partial from operator import ne -from typing import Awaitable, Deque, Dict, List, Optional, Set, Tuple +from typing import Awaitable, Deque, Dict, List, Optional, Set, cast from apscheduler.events import EVENT_JOB_ERROR # type: ignore from tortoise.exceptions import OperationalError @@ -167,11 +167,19 @@ async def _on_big_maps(self, datasource: TzktDatasource, big_maps: List[BigMapDa index.push_big_maps(big_maps) async def _on_rollback(self, datasource: TzktDatasource, from_level: int, to_level: int) -> None: - if from_level - to_level == 1: - self._logger.info('Attempting a single level rollback') + """Perform a single level rollback when possible, otherwise call `on_rollback` hook""" + # NOTE: Zero difference between levels means we received no operations/big_maps on this level and thus channel level hasn't changed + is_single_level_rollback = from_level - to_level in (0, 1) + + if is_single_level_rollback: # NOTE: Notify all indexes which use rolled back datasource to drop duplicated operations from the next block - indexes = iter(self._indexes.values()) - matching_indexes: Tuple[OperationIndex] = tuple(filter(lambda index: index.datasource == datasource, indexes)) # type: ignore + self._logger.info('Attempting a single level rollback') + matching_indexes = tuple( + filter( + lambda index: index.datasource == datasource, + self._indexes.values(), + ) + ) all_indexes_are_operation = all(isinstance(index, OperationIndex) for index in matching_indexes) self._logger.info( 'Indexes: %s total, %s matching, all are `operation` is %s', @@ -181,7 +189,7 @@ async def _on_rollback(self, datasource: TzktDatasource, from_level: int, to_lev ) if all_indexes_are_operation: - for index in matching_indexes: + for index in cast(List[OperationIndex], matching_indexes): await index.single_level_rollback(from_level) return From df38dabc7cbb0ab8db284242ee05fdab570de4ff Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sat, 2 Oct 2021 16:53:31 +0300 Subject: [PATCH 12/29] Zero level rollbacks special case, more refactoring --- src/dipdup/dipdup.py | 30 ++++++++++++++++-------------- src/dipdup/index.py | 2 +- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index d35ba645e..c612369b3 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -169,31 +169,33 @@ async def _on_big_maps(self, datasource: TzktDatasource, big_maps: List[BigMapDa async def _on_rollback(self, datasource: TzktDatasource, from_level: int, to_level: int) -> None: """Perform a single level rollback when possible, otherwise call `on_rollback` hook""" # NOTE: Zero difference between levels means we received no operations/big_maps on this level and thus channel level hasn't changed - is_single_level_rollback = from_level - to_level in (0, 1) + zero_level_rollback = from_level - to_level == 0 + single_level_rollback = from_level - to_level == 1 - if is_single_level_rollback: + if zero_level_rollback: + self._logger.info('Zero level rollback, ignoring') + + elif single_level_rollback: # NOTE: Notify all indexes which use rolled back datasource to drop duplicated operations from the next block - self._logger.info('Attempting a single level rollback') - matching_indexes = tuple( - filter( - lambda index: index.datasource == datasource, - self._indexes.values(), - ) - ) - all_indexes_are_operation = all(isinstance(index, OperationIndex) for index in matching_indexes) + self._logger.info('Checking if single level rollback is possible') + matching_indexes = tuple(i for i in self._indexes.values() if i.datasource == datasource) + matching_operation_indexes = tuple(i for i in matching_indexes if isinstance(i, OperationIndex)) self._logger.info( - 'Indexes: %s total, %s matching, all are `operation` is %s', + 'Indexes: %s total, %s matching, %s support single level rollback', len(self._indexes), len(matching_indexes), - all_indexes_are_operation, + len(matching_operation_indexes), ) + all_indexes_are_operation = len(matching_indexes) == len(matching_operation_indexes) if all_indexes_are_operation: for index in cast(List[OperationIndex], matching_indexes): await index.single_level_rollback(from_level) - return + else: + await self._ctx.fire_hook('on_rollback', datasource=datasource, from_level=from_level, to_level=to_level) - await self._ctx.fire_hook('on_rollback', datasource=datasource, from_level=from_level, to_level=to_level) + else: + await self._ctx.fire_hook('on_rollback', datasource=datasource, from_level=from_level, to_level=to_level) class DipDup: diff --git a/src/dipdup/index.py b/src/dipdup/index.py index 699d79799..b3e67f325 100644 --- a/src/dipdup/index.py +++ b/src/dipdup/index.py @@ -160,7 +160,7 @@ async def single_level_rollback(self, level: int) -> None: if state_level < level: self._logger.info('Index level is lower than rollback level, ignoring: %s < %s', state_level, level) elif state_level == level: - self._logger.info('Single level rollback has been triggered') + self._logger.info('Single level rollback, next block will be processed partially') self._rollback_level = level else: raise RuntimeError(f'Index level is higher than rollback level: {state_level} > {level}') From 8b586f411164c91002e78369d4b1f04bbaf0733b Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sat, 2 Oct 2021 17:12:12 +0300 Subject: [PATCH 13/29] Refactoring, docs, fix integrity checks --- src/dipdup/datasources/tzkt/datasource.py | 10 +++++++-- src/dipdup/index.py | 25 +++++++++++------------ 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/src/dipdup/datasources/tzkt/datasource.py b/src/dipdup/datasources/tzkt/datasource.py index 8f7f8b70c..d9ada87ba 100644 --- a/src/dipdup/datasources/tzkt/datasource.py +++ b/src/dipdup/datasources/tzkt/datasource.py @@ -153,7 +153,10 @@ async def _fetch_transactions(self, field: str) -> None: self._heads[key] = self._get_operations_head(transactions) async def fetch_operations_by_level(self) -> AsyncGenerator[Tuple[int, List[OperationData]], None]: - """Iterate by operations from multiple channels. Return is splitted by level, deduped/sorted and ready to be passeed to Matcher.""" + """Iterate over operations fetched with multiple REST requests with different filters. + + Resulting data is splitted by level, deduped, sorted and ready to be processed by OperationIndex. + """ for type_ in ( OperationFetcherRequest.sender_transactions, OperationFetcherRequest.target_transactions, @@ -206,7 +209,10 @@ def __init__( self._cache = cache async def fetch_big_maps_by_level(self) -> AsyncGenerator[Tuple[int, List[BigMapData]], None]: - """Fetch big map diffs via Fetcher (not implemented yet) and pass to message callback""" + """Iterate over big map diffs fetched fetched from REST. + + Resulting data is splitted by level, deduped, sorted and ready to be processed by BigMapIndex. + """ offset = 0 big_maps = [] diff --git a/src/dipdup/index.py b/src/dipdup/index.py index b3e67f325..5a2457fb8 100644 --- a/src/dipdup/index.py +++ b/src/dipdup/index.py @@ -130,6 +130,12 @@ async def _exit_sync_state(self, last_level: int) -> None: self._logger.info('Index is synchronized to level %s', last_level) await self.state.update_status(status=IndexStatus.REALTIME, level=last_level) + def _extract_level(self, message: Union[List[OperationData], List[BigMapData]]) -> int: + batch_levels = tuple(set(item.level for item in message)) + if len(batch_levels) != 1: + raise RuntimeError(f'Items in operation/big_map batch have different levels: {batch_levels}') + return tuple(batch_levels)[0] + class OperationIndex(Index): _config: OperationIndexConfig @@ -211,11 +217,7 @@ async def _synchronize(self, last_level: int, cache: bool = False) -> None: await self._exit_sync_state(last_level) async def _process_level_operations(self, operations: List[OperationData]) -> None: - # TODO: extract_level - batch_levels = tuple(set(operation.level for operation in operations)) - if len(batch_levels) != 1: - raise RuntimeError(f'Operations in batch have different levels: {batch_levels}') - level = tuple(batch_levels)[0] + level = self._extract_level(operations) if self._rollback_level: levels = { @@ -241,9 +243,9 @@ async def _process_level_operations(self, operations: List[OperationData]) -> No return operations = [op for op in operations if op.hash in new_hashes] - # NOTE: le intended since it's not a single level rollback - elif level <= self.state.level: - raise RuntimeError(f'Level of operation batch must be higher than index state level: {level} <= {self.state.level}') + # NOTE: le operator because it could be a single level rollback + elif level < self.state.level: + raise RuntimeError(f'Level of operation batch must be higher than index state level: {level} < {self.state.level}') async with in_global_transaction(): self._logger.info('Processing %s operations of level %s', len(operations), level) @@ -466,12 +468,9 @@ async def _synchronize(self, last_level: int, cache: bool = False) -> None: await self._exit_sync_state(last_level) async def _process_level_big_maps(self, big_maps: List[BigMapData]): - # TODO: extract_level - batch_levels = tuple(set(big_map.level for big_map in big_maps)) - if len(batch_levels) != 1: - raise RuntimeError(f'Operations in batch have different levels: {batch_levels}') - level = tuple(batch_levels)[0] + level = self._extract_level(big_maps) + # NOTE: le operator because single level rollbacks are not supported if level <= self.state.level: raise RuntimeError(f'Level of big map batch must be higher than index state level: {level} <= {self.state.level}') From 754dc55a29b4155899f99daf9aa203aa3403c7bf Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sat, 2 Oct 2021 17:38:26 +0300 Subject: [PATCH 14/29] Replace some lists with deques --- src/dipdup/datasources/datasource.py | 10 ++-- src/dipdup/datasources/tzkt/datasource.py | 12 ++-- src/dipdup/index.py | 14 ++--- tests/integration_tests/test_rollback.py | 67 ++++++++++++----------- 4 files changed, 54 insertions(+), 49 deletions(-) diff --git a/src/dipdup/datasources/datasource.py b/src/dipdup/datasources/datasource.py index 743e29651..370066ea6 100644 --- a/src/dipdup/datasources/datasource.py +++ b/src/dipdup/datasources/datasource.py @@ -1,6 +1,6 @@ import logging from abc import abstractmethod -from typing import Awaitable, Callable, List, Set +from typing import Awaitable, Callable, Set, Tuple from dipdup.config import HTTPConfig from dipdup.http import HTTPGateway @@ -11,8 +11,8 @@ HeadCallbackT = Callable[['IndexDatasource', HeadBlockData], Awaitable[None]] -OperationsCallbackT = Callable[['IndexDatasource', List[OperationData]], Awaitable[None]] -BigMapsCallbackT = Callable[['IndexDatasource', List[BigMapData]], Awaitable[None]] +OperationsCallbackT = Callable[['IndexDatasource', Tuple[OperationData, ...]], Awaitable[None]] +BigMapsCallbackT = Callable[['IndexDatasource', Tuple[BigMapData, ...]], Awaitable[None]] RollbackCallbackT = Callable[['IndexDatasource', int, int], Awaitable[None]] @@ -57,11 +57,11 @@ async def emit_head(self, head: HeadBlockData) -> None: for fn in self._on_head: await fn(self, head) - async def emit_operations(self, operations: List[OperationData]) -> None: + async def emit_operations(self, operations: Tuple[OperationData, ...]) -> None: for fn in self._on_operations: await fn(self, operations) - async def emit_big_maps(self, big_maps: List[BigMapData]) -> None: + async def emit_big_maps(self, big_maps: Tuple[BigMapData, ...]) -> None: for fn in self._on_big_maps: await fn(self, big_maps) diff --git a/src/dipdup/datasources/tzkt/datasource.py b/src/dipdup/datasources/tzkt/datasource.py index d9ada87ba..c121bde7f 100644 --- a/src/dipdup/datasources/tzkt/datasource.py +++ b/src/dipdup/datasources/tzkt/datasource.py @@ -1,9 +1,9 @@ import asyncio import logging -from collections import defaultdict +from collections import defaultdict, deque from datetime import datetime, timezone from decimal import Decimal -from typing import Any, AsyncGenerator, DefaultDict, Dict, List, NoReturn, Optional, Set, Tuple, cast +from typing import Any, AsyncGenerator, DefaultDict, Deque, Dict, List, NoReturn, Optional, Set, Tuple, cast from aiohttp import ClientResponseError from aiosignalrcore.hub.base_hub_connection import BaseHubConnection # type: ignore @@ -660,23 +660,23 @@ async def _extract_message_data(self, type_: MessageType, message: List[Any]) -> async def _on_operations_message(self, message: List[Dict[str, Any]]) -> None: """Parse and emit raw operations from WS""" async for data in self._extract_message_data(MessageType.operation, message): - operations = [] + operations: Deque[OperationData] = deque() for operation_json in data: if operation_json['status'] != 'applied': continue operation = self.convert_operation(operation_json) operations.append(operation) if operations: - await self.emit_operations(operations) + await self.emit_operations(tuple(operations)) async def _on_big_maps_message(self, message: List[Dict[str, Any]]) -> None: """Parse and emit raw big map diffs from WS""" async for data in self._extract_message_data(MessageType.big_map, message): - big_maps = [] + big_maps: Deque[BigMapData] = deque() for big_map_json in data: big_map = self.convert_big_map(big_map_json) big_maps.append(big_map) - await self.emit_big_maps(big_maps) + await self.emit_big_maps(tuple(big_maps)) async def _on_head_message(self, message: List[Dict[str, Any]]) -> None: """Parse and emit raw head block from WS""" diff --git a/src/dipdup/index.py b/src/dipdup/index.py index 5a2457fb8..0d4768cdd 100644 --- a/src/dipdup/index.py +++ b/src/dipdup/index.py @@ -1,6 +1,6 @@ from abc import abstractmethod from collections import defaultdict, deque, namedtuple -from typing import Deque, Dict, List, Optional, Set, Tuple, Union, cast +from typing import Deque, Dict, Iterable, List, Optional, Set, Tuple, Union, cast from pydantic.error_wrappers import ValidationError @@ -285,10 +285,10 @@ async def _match_operation(self, pattern_config: OperationHandlerPatternConfigT, else: raise NotImplementedError - async def _process_operations(self, operations: List[OperationData]) -> None: + async def _process_operations(self, operations: Iterable[OperationData]) -> None: """Try to match operations in cache with all patterns from indexes. Must be wrapped in transaction.""" self._head_hashes = set() - operation_subgroups: Dict[OperationSubgroup, List[OperationData]] = defaultdict(list) + operation_subgroups: Dict[OperationSubgroup, Deque[OperationData]] = defaultdict(deque) for operation in operations: key = OperationSubgroup(operation.hash, operation.counter) operation_subgroups[key].append(operation) @@ -300,7 +300,7 @@ async def _process_operations(self, operations: List[OperationData]) -> None: for handler_config in self._config.handlers: operation_idx = 0 pattern_idx = 0 - matched_operations: List[Optional[OperationData]] = [] + matched_operations: Deque[Optional[OperationData]] = deque() # TODO: Ensure complex cases work, for ex. required argument after optional one # TODO: Add None to matched_operations where applicable (pattern is optional and operation not found) @@ -328,7 +328,7 @@ async def _process_operations(self, operations: List[OperationData]) -> None: if pattern_idx == len(handler_config.pattern): await self._on_match(operation_subgroup, handler_config, matched_operations) - matched_operations = [] + matched_operations.clear() pattern_idx = 0 if len(matched_operations) >= sum(map(lambda x: 0 if x.optional else 1, handler_config.pattern)): @@ -338,7 +338,7 @@ async def _on_match( self, operation_subgroup: OperationSubgroup, handler_config: OperationHandlerConfig, - matched_operations: List[Optional[OperationData]], + matched_operations: Deque[Optional[OperationData]], ): """Prepare handler arguments, parse parameter and storage. Schedule callback in executor.""" self._logger.info('%s: `%s` handler matched!', operation_subgroup.hash, handler_config.callback) @@ -531,7 +531,7 @@ async def _on_match( big_map_diff, ) - async def _process_big_maps(self, big_maps: List[BigMapData]) -> None: + async def _process_big_maps(self, big_maps: Iterable[BigMapData]) -> None: """Try to match big map diffs in cache with all patterns from indexes.""" for big_map in big_maps: diff --git a/tests/integration_tests/test_rollback.py b/tests/integration_tests/test_rollback.py index 88e4d13e0..36fe43a7d 100644 --- a/tests/integration_tests/test_rollback.py +++ b/tests/integration_tests/test_rollback.py @@ -1,13 +1,13 @@ import asyncio from datetime import datetime from os.path import dirname, join +from typing import Tuple from unittest import IsolatedAsyncioTestCase -from unittest.mock import MagicMock, patch +from unittest.mock import patch from dipdup.config import DipDupConfig from dipdup.datasources.tzkt.datasource import TzktDatasource from dipdup.dipdup import DipDup -from dipdup.models import HeadBlockData from dipdup.models import Index as State from dipdup.models import OperationData @@ -31,45 +31,50 @@ def _get_operation(hash_: str, level: int) -> OperationData: ) -# NOTE: Same level, different hash -old_block = MagicMock(spec=HeadBlockData) -old_block.hash = 'block_a' -old_block.level = 1365001 -old_block.timestamp = datetime(2018, 1, 1) -new_block = MagicMock(spec=HeadBlockData) -new_block.hash = 'block_b' -new_block.level = 1365001 -new_block.timestamp = datetime(2018, 1, 1) +exact_operations = ( + _get_operation('1', 1365001), + _get_operation('2', 1365001), + _get_operation('3', 1365001), +) +less_operations = ( + _get_operation('1', 1365001), + _get_operation('2', 1365001), +) -# NOTE: Emit operations, rollback, emit again, check state -async def datasource_run(self: TzktDatasource): - await self.emit_operations( - [ - _get_operation('1', 1365001), - _get_operation('2', 1365001), - _get_operation('3', 1365001), - ], - ) +more_operations = ( + _get_operation('1', 1365001), + _get_operation('2', 1365001), + _get_operation('3', 1365001), + _get_operation('4', 1365001), +) + + +async def check_level() -> None: + state = await State.filter(name='hen_mainnet').get() + assert state.level == 1365001, state.level + + +async def emit_messages( + self: TzktDatasource, + old_block: Tuple[OperationData, ...], + new_block: Tuple[OperationData, ...], +): + await self.emit_operations(old_block) await self.emit_rollback( from_level=1365001, to_level=1365000, ) - await self.emit_operations( - [ - _get_operation('1', 1365001), - _get_operation('2', 1365001), - _get_operation('3', 1365001), - ] - ) - - # NOTE: Assert while Tortoise is still running - state = await State.filter(name='hen_mainnet').get() - assert state.level == 1365001, state.level + await self.emit_operations(new_block) raise asyncio.CancelledError +# NOTE: Emit operations, rollback, emit again, check state +async def datasource_run(self: TzktDatasource): + await emit_messages(self, exact_operations, exact_operations) + + class RollbackTest(IsolatedAsyncioTestCase): async def test_rollback_ok(self): # Arrange From f3c5e58c058bb084435971cb46b3668f6551b58d Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sat, 2 Oct 2021 17:51:10 +0300 Subject: [PATCH 15/29] test_rollback WIP --- tests/integration_tests/test_rollback.py | 33 ++++++++++++------------ 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/tests/integration_tests/test_rollback.py b/tests/integration_tests/test_rollback.py index 36fe43a7d..372607699 100644 --- a/tests/integration_tests/test_rollback.py +++ b/tests/integration_tests/test_rollback.py @@ -30,6 +30,7 @@ def _get_operation(hash_: str, level: int) -> OperationData: has_internals=False, ) +initial_level = 1365000 exact_operations = ( _get_operation('1', 1365001), @@ -50,41 +51,41 @@ def _get_operation(hash_: str, level: int) -> OperationData: ) -async def check_level() -> None: +async def check_level(level: int) -> None: state = await State.filter(name='hen_mainnet').get() - assert state.level == 1365001, state.level + assert state.level == level, state.level async def emit_messages( self: TzktDatasource, old_block: Tuple[OperationData, ...], new_block: Tuple[OperationData, ...], + level: int, ): await self.emit_operations(old_block) await self.emit_rollback( from_level=1365001, - to_level=1365000, + to_level=1365001 - level, ) await self.emit_operations(new_block) raise asyncio.CancelledError -# NOTE: Emit operations, rollback, emit again, check state -async def datasource_run(self: TzktDatasource): - await emit_messages(self, exact_operations, exact_operations) +async def datasource_run_exact(self: TzktDatasource): + await emit_messages(self, exact_operations, exact_operations, 1) + await check_level(initial_level + 1) class RollbackTest(IsolatedAsyncioTestCase): - async def test_rollback_ok(self): - # Arrange - config = DipDupConfig.load([join(dirname(__file__), 'hic_et_nunc.yml')]) - config.database.path = ':memory:' - config.indexes['hen_mainnet'].last_level = config.indexes['hen_mainnet'].first_level + 1 - config.initialize() + async def asyncSetUp(self) -> None: + self.config = DipDupConfig.load([join(dirname(__file__), 'hic_et_nunc.yml')]) + self.config.database.path = ':memory:' + self.config.indexes['hen_mainnet'].last_level = self.config.indexes['hen_mainnet'].first_level + 1 + self.config.initialize() + self.dipdup = DipDup(self.config) - dipdup = DipDup(config) - # Act - with patch('dipdup.datasources.tzkt.datasource.TzktDatasource.run', datasource_run): - await dipdup.run(False, False, False) + async def test_rollback_ok(self): + with patch('dipdup.datasources.tzkt.datasource.TzktDatasource.run', datasource_run_exact): + await self.dipdup.run(False, False, False) From 0a6842f3702c19f9db1c90e7eec0143af093ad03 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sat, 2 Oct 2021 17:53:33 +0300 Subject: [PATCH 16/29] Another test --- tests/integration_tests/test_rollback.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/tests/integration_tests/test_rollback.py b/tests/integration_tests/test_rollback.py index 372607699..185cc4a34 100644 --- a/tests/integration_tests/test_rollback.py +++ b/tests/integration_tests/test_rollback.py @@ -5,7 +5,7 @@ from unittest import IsolatedAsyncioTestCase from unittest.mock import patch -from dipdup.config import DipDupConfig +from dipdup.config import DipDupConfig, PostgresDatabaseConfig from dipdup.datasources.tzkt.datasource import TzktDatasource from dipdup.dipdup import DipDup from dipdup.models import Index as State @@ -77,15 +77,24 @@ async def datasource_run_exact(self: TzktDatasource): await check_level(initial_level + 1) +async def datasource_run_more(self: TzktDatasource): + await emit_messages(self, exact_operations, more_operations, 1) + await check_level(initial_level + 1) + + class RollbackTest(IsolatedAsyncioTestCase): async def asyncSetUp(self) -> None: self.config = DipDupConfig.load([join(dirname(__file__), 'hic_et_nunc.yml')]) - self.config.database.path = ':memory:' - self.config.indexes['hen_mainnet'].last_level = self.config.indexes['hen_mainnet'].first_level + 1 + self.config.database.path = ':memory:' # type: ignore + self.config.indexes['hen_mainnet'].last_level = self.config.indexes['hen_mainnet'].first_level + 1 # type: ignore self.config.initialize() self.dipdup = DipDup(self.config) - async def test_rollback_ok(self): + async def test_rollback_exact(self): with patch('dipdup.datasources.tzkt.datasource.TzktDatasource.run', datasource_run_exact): await self.dipdup.run(False, False, False) + + async def test_rollback_more(self): + with patch('dipdup.datasources.tzkt.datasource.TzktDatasource.run', datasource_run_more): + await self.dipdup.run(False, False, False) From 14e6adbad709a58e532d80f3ff5a97afdd29e114 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sat, 2 Oct 2021 18:30:35 +0300 Subject: [PATCH 17/29] Another test --- tests/integration_tests/test_rollback.py | 28 +++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/tests/integration_tests/test_rollback.py b/tests/integration_tests/test_rollback.py index 185cc4a34..43255420f 100644 --- a/tests/integration_tests/test_rollback.py +++ b/tests/integration_tests/test_rollback.py @@ -1,16 +1,19 @@ import asyncio +import logging from datetime import datetime from os.path import dirname, join from typing import Tuple -from unittest import IsolatedAsyncioTestCase +from unittest import IsolatedAsyncioTestCase, skip from unittest.mock import patch -from dipdup.config import DipDupConfig, PostgresDatabaseConfig +from dipdup.config import DipDupConfig from dipdup.datasources.tzkt.datasource import TzktDatasource from dipdup.dipdup import DipDup from dipdup.models import Index as State from dipdup.models import OperationData +logging.basicConfig(level=logging.DEBUG) + def _get_operation(hash_: str, level: int) -> OperationData: return OperationData( @@ -30,6 +33,7 @@ def _get_operation(hash_: str, level: int) -> OperationData: has_internals=False, ) + initial_level = 1365000 exact_operations = ( @@ -82,6 +86,16 @@ async def datasource_run_more(self: TzktDatasource): await check_level(initial_level + 1) +async def datasource_run_less(self: TzktDatasource): + await emit_messages(self, exact_operations, less_operations, 1) + await check_level(initial_level + 1) + + +async def datasource_run_zero(self: TzktDatasource): + await emit_messages(self, (), (exact_operations), 0) + await check_level(initial_level + 1) + + class RollbackTest(IsolatedAsyncioTestCase): async def asyncSetUp(self) -> None: self.config = DipDupConfig.load([join(dirname(__file__), 'hic_et_nunc.yml')]) @@ -90,7 +104,6 @@ async def asyncSetUp(self) -> None: self.config.initialize() self.dipdup = DipDup(self.config) - async def test_rollback_exact(self): with patch('dipdup.datasources.tzkt.datasource.TzktDatasource.run', datasource_run_exact): await self.dipdup.run(False, False, False) @@ -98,3 +111,12 @@ async def test_rollback_exact(self): async def test_rollback_more(self): with patch('dipdup.datasources.tzkt.datasource.TzktDatasource.run', datasource_run_more): await self.dipdup.run(False, False, False) + + @skip('FIXME') + async def test_rollback_less(self): + with patch('dipdup.datasources.tzkt.datasource.TzktDatasource.run', datasource_run_less): + await self.dipdup.run(False, False, False) + + async def test_rollback_zero(self): + with patch('dipdup.datasources.tzkt.datasource.TzktDatasource.run', datasource_run_zero): + await self.dipdup.run(False, False, False) From 7ad955d4ce86259b89fbf235bb4e0bc5def71a3b Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sat, 2 Oct 2021 18:30:45 +0300 Subject: [PATCH 18/29] Fix realtime atomicity --- src/dipdup/index.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dipdup/index.py b/src/dipdup/index.py index 0d4768cdd..032d89253 100644 --- a/src/dipdup/index.py +++ b/src/dipdup/index.py @@ -180,7 +180,7 @@ async def _process_queue(self) -> None: if isinstance(message, SingleLevelRollback): await self.single_level_rollback(message.level) elif isinstance(message, list): - await self._process_operations(message) + await self._process_level_operations(message) else: raise RuntimeError From a221c8841a552202b98b1caaa560cd79c9827045 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sat, 2 Oct 2021 18:48:48 +0300 Subject: [PATCH 19/29] More tests --- src/dipdup/index.py | 4 +--- tests/integration_tests/test_rollback.py | 17 +++++++++++++++-- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/dipdup/index.py b/src/dipdup/index.py index 032d89253..ebd5d295e 100644 --- a/src/dipdup/index.py +++ b/src/dipdup/index.py @@ -179,10 +179,8 @@ async def _process_queue(self) -> None: message = self._queue.popleft() if isinstance(message, SingleLevelRollback): await self.single_level_rollback(message.level) - elif isinstance(message, list): - await self._process_level_operations(message) else: - raise RuntimeError + await self._process_level_operations(message) async def _synchronize(self, last_level: int, cache: bool = False) -> None: """Fetch operations via Fetcher and pass to message callback""" diff --git a/tests/integration_tests/test_rollback.py b/tests/integration_tests/test_rollback.py index 43255420f..fa2a5abae 100644 --- a/tests/integration_tests/test_rollback.py +++ b/tests/integration_tests/test_rollback.py @@ -4,7 +4,7 @@ from os.path import dirname, join from typing import Tuple from unittest import IsolatedAsyncioTestCase, skip -from unittest.mock import patch +from unittest.mock import AsyncMock, patch from dipdup.config import DipDupConfig from dipdup.datasources.tzkt.datasource import TzktDatasource @@ -96,6 +96,11 @@ async def datasource_run_zero(self: TzktDatasource): await check_level(initial_level + 1) +async def datasource_run_deep(self: TzktDatasource): + await emit_messages(self, (exact_operations), (), 1337) + await check_level(initial_level + 1) + + class RollbackTest(IsolatedAsyncioTestCase): async def asyncSetUp(self) -> None: self.config = DipDupConfig.load([join(dirname(__file__), 'hic_et_nunc.yml')]) @@ -115,8 +120,16 @@ async def test_rollback_more(self): @skip('FIXME') async def test_rollback_less(self): with patch('dipdup.datasources.tzkt.datasource.TzktDatasource.run', datasource_run_less): - await self.dipdup.run(False, False, False) + with patch('dipdup.context.DipDupContext.reindex', AsyncMock()) as reindex_mock: + await self.dipdup.run(False, False, False) + assert reindex_mock.call_count == 1 async def test_rollback_zero(self): with patch('dipdup.datasources.tzkt.datasource.TzktDatasource.run', datasource_run_zero): await self.dipdup.run(False, False, False) + + async def test_rollback_deep(self): + with patch('dipdup.datasources.tzkt.datasource.TzktDatasource.run', datasource_run_deep): + with patch('dipdup.context.DipDupContext.reindex', AsyncMock()) as reindex_mock: + await self.dipdup.run(False, False, False) + assert reindex_mock.call_count == 1 From 7a5c8ce0c3c560d73de52e4c5b1a76f8c049aff2 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sat, 2 Oct 2021 21:22:20 +0300 Subject: [PATCH 20/29] Update changelog --- CHANGELOG.md | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f6a525b3b..bc8961cd6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,16 +2,21 @@ ## [unreleased] +### Improved + +* Improved performance of initial index synchronization via REST. + ### Fixed -* Fixed possible race condition during single level rollback. -* Removed unnecessary file IO calls, improved logging. +* Fixed processing zero- and single-level rollbacks. +* Removed unnecessary file IO calls causing `PermissionError` exception in Docker environment. +* Fixed possible atomicity issue during real-time indexing. ## 3.0.3 - 2021-10-01 ### Fixed -* Fixed processing of single level rollbacks emitted before rolled back head. +* Fixed processing of single-level rollbacks emitted before rolled back head. ## 3.0.2 - 2021-09-30 From 514ffa2a610dc2be7c2c68f6a2fb41c3c37d9d71 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sat, 2 Oct 2021 23:45:49 +0300 Subject: [PATCH 21/29] Disable logger --- tests/integration_tests/test_rollback.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration_tests/test_rollback.py b/tests/integration_tests/test_rollback.py index fa2a5abae..887c1aa92 100644 --- a/tests/integration_tests/test_rollback.py +++ b/tests/integration_tests/test_rollback.py @@ -1,5 +1,4 @@ import asyncio -import logging from datetime import datetime from os.path import dirname, join from typing import Tuple @@ -12,7 +11,8 @@ from dipdup.models import Index as State from dipdup.models import OperationData -logging.basicConfig(level=logging.DEBUG) +# import logging +# logging.basicConfig(level=logging.DEBUG) def _get_operation(hash_: str, level: int) -> OperationData: From dda73d14bff00c3d221bbab3ed07202e35e2eab6 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sun, 3 Oct 2021 00:06:01 +0300 Subject: [PATCH 22/29] Warn on rollbacks --- src/dipdup/dipdup.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index c612369b3..ebaa40982 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -168,6 +168,8 @@ async def _on_big_maps(self, datasource: TzktDatasource, big_maps: List[BigMapDa async def _on_rollback(self, datasource: TzktDatasource, from_level: int, to_level: int) -> None: """Perform a single level rollback when possible, otherwise call `on_rollback` hook""" + self._logger.warning('Datasource `%s` rolled back: %s -> %s', datasource.name, from_level, to_level) + # NOTE: Zero difference between levels means we received no operations/big_maps on this level and thus channel level hasn't changed zero_level_rollback = from_level - to_level == 0 single_level_rollback = from_level - to_level == 1 From d2a229aab87de44f18a5b588dd5d0e1281a1809c Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sun, 3 Oct 2021 00:29:49 +0300 Subject: [PATCH 23/29] Optimizations --- src/dipdup/datasources/tzkt/datasource.py | 36 ++++++++++------------- src/dipdup/dipdup.py | 6 ++-- src/dipdup/index.py | 20 ++++++------- 3 files changed, 29 insertions(+), 33 deletions(-) diff --git a/src/dipdup/datasources/tzkt/datasource.py b/src/dipdup/datasources/tzkt/datasource.py index c121bde7f..f9f6153ee 100644 --- a/src/dipdup/datasources/tzkt/datasource.py +++ b/src/dipdup/datasources/tzkt/datasource.py @@ -29,16 +29,18 @@ ) from dipdup.enums import MessageType from dipdup.models import BigMapAction, BigMapData, BlockData, HeadBlockData, OperationData, QuoteData -from dipdup.utils import groupby, split_by_chunks +from dipdup.utils import split_by_chunks TZKT_ORIGINATIONS_REQUEST_LIMIT = 100 -def dedup_operations(operations: List[OperationData]) -> List[OperationData]: +def dedup_operations(operations: Tuple[OperationData, ...]) -> Tuple[OperationData, ...]: """Merge operations from multiple endpoints""" - return sorted( - list(({op.id: op for op in operations}).values()), - key=lambda op: op.id, + return tuple( + sorted( + tuple(({op.id: op for op in operations}).values()), + key=lambda op: op.id, + ) ) @@ -55,7 +57,7 @@ def __init__( transaction_addresses: Set[str], origination_addresses: Set[str], cache: bool = False, - migration_originations: List[OperationData] = None, + migration_originations: Tuple[OperationData, ...] = None, ) -> None: self._datasource = datasource self._first_level = first_level @@ -70,11 +72,9 @@ def __init__( self._offsets: Dict[OperationFetcherRequest, int] = {} self._fetched: Dict[OperationFetcherRequest, bool] = {} - self._operations: DefaultDict[int, List[OperationData]] - if migration_originations: - self._operations = groupby(migration_originations, lambda op: op.level) - else: - self._operations = defaultdict(list) + self._operations: DefaultDict[int, Deque[OperationData]] = defaultdict(deque) + for origination in migration_originations or (): + self._operations[origination.level].append(origination) def _get_operations_head(self, operations: List[OperationData]) -> int: """Get latest block level (head) of sorted operations batch""" @@ -104,8 +104,6 @@ async def _fetch_originations(self) -> None: for op in originations: level = op.level - if level not in self._operations: - self._operations[level] = [] self._operations[level].append(op) self._logger.debug('Got %s', len(originations)) @@ -139,8 +137,6 @@ async def _fetch_transactions(self, field: str) -> None: for op in transactions: level = op.level - if level not in self._operations: - self._operations[level] = [] self._operations[level].append(op) self._logger.debug('Got %s', len(transactions)) @@ -152,7 +148,7 @@ async def _fetch_transactions(self, field: str) -> None: self._offsets[key] += self._datasource.request_limit self._heads[key] = self._get_operations_head(transactions) - async def fetch_operations_by_level(self) -> AsyncGenerator[Tuple[int, List[OperationData]], None]: + async def fetch_operations_by_level(self) -> AsyncGenerator[Tuple[int, Tuple[OperationData, ...]], None]: """Iterate over operations fetched with multiple REST requests with different filters. Resulting data is splitted by level, deduped, sorted and ready to be processed by OperationIndex. @@ -181,7 +177,7 @@ async def fetch_operations_by_level(self) -> AsyncGenerator[Tuple[int, List[Oper while self._head <= head: if self._head in self._operations: operations = self._operations.pop(self._head) - yield self._head, dedup_operations(operations) + yield self._head, dedup_operations(tuple(operations)) self._head += 1 if all(list(self._fetched.values())): @@ -208,7 +204,7 @@ def __init__( self._big_map_paths = big_map_paths self._cache = cache - async def fetch_big_maps_by_level(self) -> AsyncGenerator[Tuple[int, List[BigMapData]], None]: + async def fetch_big_maps_by_level(self) -> AsyncGenerator[Tuple[int, Tuple[BigMapData, ...]], None]: """Iterate over big map diffs fetched fetched from REST. Resulting data is splitted by level, deduped, sorted and ready to be processed by BigMapIndex. @@ -231,7 +227,7 @@ async def fetch_big_maps_by_level(self) -> AsyncGenerator[Tuple[int, List[BigMap while True: for i in range(len(big_maps) - 1): if big_maps[i].level != big_maps[i + 1].level: - yield big_maps[i].level, big_maps[: i + 1] + yield big_maps[i].level, tuple(big_maps[: i + 1]) big_maps = big_maps[i + 1 :] # noqa: E203 break else: @@ -243,7 +239,7 @@ async def fetch_big_maps_by_level(self) -> AsyncGenerator[Tuple[int, List[BigMap offset += self._datasource.request_limit if big_maps: - yield big_maps[0].level, big_maps[: i + 1] + yield big_maps[0].level, tuple(big_maps[: i + 1]) class TzktDatasource(IndexDatasource): diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index ebaa40982..6cdf4f12f 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -5,7 +5,7 @@ from contextlib import AsyncExitStack, asynccontextmanager, suppress from functools import partial from operator import ne -from typing import Awaitable, Deque, Dict, List, Optional, Set, cast +from typing import Awaitable, Deque, Dict, List, Optional, Set, Tuple, cast from apscheduler.events import EVENT_JOB_ERROR # type: ignore from tortoise.exceptions import OperationalError @@ -156,12 +156,12 @@ async def _on_head(self, datasource: TzktDatasource, head: HeadBlockData) -> Non ) ) - async def _on_operations(self, datasource: TzktDatasource, operations: List[OperationData]) -> None: + async def _on_operations(self, datasource: TzktDatasource, operations: Tuple[OperationData, ...]) -> None: for index in self._indexes.values(): if isinstance(index, OperationIndex) and index.datasource == datasource: index.push_operations(operations) - async def _on_big_maps(self, datasource: TzktDatasource, big_maps: List[BigMapData]) -> None: + async def _on_big_maps(self, datasource: TzktDatasource, big_maps: Tuple[BigMapData]) -> None: for index in self._indexes.values(): if isinstance(index, BigMapIndex) and index.datasource == datasource: index.push_big_maps(big_maps) diff --git a/src/dipdup/index.py b/src/dipdup/index.py index ebd5d295e..d8620002a 100644 --- a/src/dipdup/index.py +++ b/src/dipdup/index.py @@ -29,7 +29,7 @@ # NOTE: Message queue of OperationIndex SingleLevelRollback = namedtuple('SingleLevelRollback', ('level')) -Operations = List[OperationData] +Operations = Tuple[OperationData, ...] OperationQueueItemT = Union[Operations, SingleLevelRollback] # NOTE: For initializing the index state on startup @@ -130,7 +130,7 @@ async def _exit_sync_state(self, last_level: int) -> None: self._logger.info('Index is synchronized to level %s', last_level) await self.state.update_status(status=IndexStatus.REALTIME, level=last_level) - def _extract_level(self, message: Union[List[OperationData], List[BigMapData]]) -> int: + def _extract_level(self, message: Union[Tuple[OperationData, ...], Tuple[BigMapData, ...]]) -> int: batch_levels = tuple(set(item.level for item in message)) if len(batch_levels) != 1: raise RuntimeError(f'Items in operation/big_map batch have different levels: {batch_levels}') @@ -148,7 +148,7 @@ def __init__(self, ctx: DipDupContext, config: OperationIndexConfig, datasource: self._head_hashes: Set[str] = set() self._migration_originations: Optional[Dict[str, OperationData]] = None - def push_operations(self, operations: List[OperationData]) -> None: + def push_operations(self, operations: Tuple[OperationData, ...]) -> None: self._queue.append(operations) def push_rollback(self, level: int) -> None: @@ -192,9 +192,9 @@ async def _synchronize(self, last_level: int, cache: bool = False) -> None: transaction_addresses = await self._get_transaction_addresses() origination_addresses = await self._get_origination_addresses() - migration_originations = [] + migration_originations: Tuple[OperationData, ...] = () if self._config.types and OperationType.migration in self._config.types: - migration_originations = await self._datasource.get_migration_originations(first_level) + migration_originations = tuple(await self._datasource.get_migration_originations(first_level)) for op in migration_originations: code_hash, type_hash = await self._get_contract_hashes(cast(str, op.originated_contract_address)) op.originated_contract_code_hash, op.originated_contract_type_hash = code_hash, type_hash @@ -214,7 +214,7 @@ async def _synchronize(self, last_level: int, cache: bool = False) -> None: await self._exit_sync_state(last_level) - async def _process_level_operations(self, operations: List[OperationData]) -> None: + async def _process_level_operations(self, operations: Tuple[OperationData, ...]) -> None: level = self._extract_level(operations) if self._rollback_level: @@ -239,7 +239,7 @@ async def _process_level_operations(self, operations: List[OperationData]) -> No new_hashes = received_hashes - expected_hashes if not new_hashes: return - operations = [op for op in operations if op.hash in new_hashes] + operations = tuple(op for op in operations if op.hash in new_hashes) # NOTE: le operator because it could be a single level rollback elif level < self.state.level: @@ -427,9 +427,9 @@ class BigMapIndex(Index): def __init__(self, ctx: DipDupContext, config: BigMapIndexConfig, datasource: TzktDatasource) -> None: super().__init__(ctx, config, datasource) - self._queue: Deque[List[BigMapData]] = deque() + self._queue: Deque[Tuple[BigMapData, ...]] = deque() - def push_big_maps(self, big_maps: List[BigMapData]): + def push_big_maps(self, big_maps: Tuple[BigMapData, ...]) -> None: self._queue.append(big_maps) async def _process_queue(self) -> None: @@ -465,7 +465,7 @@ async def _synchronize(self, last_level: int, cache: bool = False) -> None: await self._exit_sync_state(last_level) - async def _process_level_big_maps(self, big_maps: List[BigMapData]): + async def _process_level_big_maps(self, big_maps: Tuple[BigMapData, ...]): level = self._extract_level(big_maps) # NOTE: le operator because single level rollbacks are not supported From 9d1ad0f31e12c8b8a216acf9c4ae4f70c6e3a91c Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sun, 3 Oct 2021 00:45:28 +0300 Subject: [PATCH 24/29] Moar --- CHANGELOG.md | 2 +- src/dipdup/datasources/tzkt/datasource.py | 34 +++++++++++------------ 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bc8961cd6..855a01d29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ ### Improved -* Improved performance of initial index synchronization via REST. +* Improved indexing performance. ### Fixed diff --git a/src/dipdup/datasources/tzkt/datasource.py b/src/dipdup/datasources/tzkt/datasource.py index f9f6153ee..baee79ee9 100644 --- a/src/dipdup/datasources/tzkt/datasource.py +++ b/src/dipdup/datasources/tzkt/datasource.py @@ -76,7 +76,7 @@ def __init__( for origination in migration_originations or (): self._operations[origination.level].append(origination) - def _get_operations_head(self, operations: List[OperationData]) -> int: + def _get_operations_head(self, operations: Tuple[OperationData, ...]) -> int: """Get latest block level (head) of sorted operations batch""" for i in range(len(operations) - 1)[::-1]: if operations[i].level != operations[i + 1].level: @@ -211,7 +211,7 @@ async def fetch_big_maps_by_level(self) -> AsyncGenerator[Tuple[int, Tuple[BigMa """ offset = 0 - big_maps = [] + big_maps: Tuple[BigMapData, ...] = tuple() while True: fetched_big_maps = await self._datasource.get_big_maps( @@ -222,7 +222,7 @@ async def fetch_big_maps_by_level(self) -> AsyncGenerator[Tuple[int, Tuple[BigMa self._last_level, cache=self._cache, ) - big_maps += fetched_big_maps + big_maps = big_maps + fetched_big_maps while True: for i in range(len(big_maps) - 1): @@ -357,7 +357,7 @@ async def get_block(self, level: int) -> BlockData: ) return self.convert_block(block_json) - async def get_migration_originations(self, first_level: int = 0) -> List[OperationData]: + async def get_migration_originations(self, first_level: int = 0) -> Tuple[OperationData, ...]: """Get contracts originated from migrations""" self._logger.info('Fetching contracts originated with migrations') # NOTE: Empty unwrapped request to ensure API supports migration originations @@ -371,7 +371,7 @@ async def get_migration_originations(self, first_level: int = 0) -> List[Operati }, ) except ClientResponseError: - return [] + return () raw_migrations = await self._http.request( 'get', @@ -382,11 +382,11 @@ async def get_migration_originations(self, first_level: int = 0) -> List[Operati 'select': ','.join(ORIGINATION_MIGRATION_FIELDS), }, ) - return [self.convert_migration_origination(m) for m in raw_migrations] + return tuple(self.convert_migration_origination(m) for m in raw_migrations) async def get_originations( self, addresses: Set[str], offset: int, first_level: int, last_level: int, cache: bool = False - ) -> List[OperationData]: + ) -> Tuple[OperationData, ...]: raw_originations = [] # NOTE: TzKT may hit URL length limit with hundreds of originations in a single request. # NOTE: Chunk of 100 addresses seems like a reasonable choice - URL of ~3971 characters. @@ -407,16 +407,16 @@ async def get_originations( cache=cache, ) - originations = [] for op in raw_originations: # NOTE: `type` field needs to be set manually when requesting operations by specific type op['type'] = 'origination' - originations.append(self.convert_operation(op)) + + originations = tuple(self.convert_operation(op) for op in raw_originations) return originations async def get_transactions( self, field: str, addresses: Set[str], offset: int, first_level: int, last_level: int, cache: bool = False - ) -> List[OperationData]: + ) -> Tuple[OperationData, ...]: raw_transactions = await self._http.request( 'get', url='v1/operations/transactions', @@ -431,16 +431,16 @@ async def get_transactions( }, cache=cache, ) - transactions = [] for op in raw_transactions: # NOTE: type needs to be set manually when requesting operations by specific type op['type'] = 'transaction' - transactions.append(self.convert_operation(op)) + + transactions = tuple(self.convert_operation(op) for op in raw_transactions) return transactions async def get_big_maps( self, addresses: Set[str], paths: Set[str], offset: int, first_level: int, last_level: int, cache: bool = False - ) -> List[BigMapData]: + ) -> Tuple[BigMapData, ...]: raw_big_maps = await self._http.request( 'get', url='v1/bigmaps/updates', @@ -454,9 +454,7 @@ async def get_big_maps( }, cache=cache, ) - big_maps = [] - for bm in raw_big_maps: - big_maps.append(self.convert_big_map(bm)) + big_maps = tuple(self.convert_big_map(bm) for bm in raw_big_maps) return big_maps async def get_quote(self, level: int) -> QuoteData: @@ -470,7 +468,7 @@ async def get_quote(self, level: int) -> QuoteData: ) return self.convert_quote(quote_json[0]) - async def get_quotes(self, from_level: int, to_level: int) -> List[QuoteData]: + async def get_quotes(self, from_level: int, to_level: int) -> Tuple[QuoteData, ...]: """Get quotes for blocks""" self._logger.info('Fetching quotes for levels %s-%s', from_level, to_level) quotes_json = await self._http.request( @@ -483,7 +481,7 @@ async def get_quotes(self, from_level: int, to_level: int) -> List[QuoteData]: }, cache=False, ) - return [self.convert_quote(quote) for quote in quotes_json] + return tuple(self.convert_quote(quote) for quote in quotes_json) async def add_index(self, index_config: ResolvedIndexConfigT) -> None: """Register index config in internal mappings and matchers. Find and register subscriptions.""" From 65c35265e6475be3b1751701db99a0c01893f076 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sun, 3 Oct 2021 00:51:47 +0300 Subject: [PATCH 25/29] Changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 855a01d29..fde07aec9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ * Fixed processing zero- and single-level rollbacks. * Removed unnecessary file IO calls causing `PermissionError` exception in Docker environment. -* Fixed possible atomicity issue during real-time indexing. +* Fixed possible atomicity violation during real-time indexing. ## 3.0.3 - 2021-10-01 From 66846501a544d25719ffa6c4f8f91bcc718b89b1 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sun, 3 Oct 2021 11:21:12 +0300 Subject: [PATCH 26/29] Ability to pass additional context on reindexing --- src/dipdup/context.py | 18 +++++++++++------- src/dipdup/exceptions.py | 7 +++++-- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/src/dipdup/context.py b/src/dipdup/context.py index 35c057ac5..0a87cb7da 100644 --- a/src/dipdup/context.py +++ b/src/dipdup/context.py @@ -90,22 +90,26 @@ async def restart(self) -> None: sys.argv.remove('--reindex') os.execl(sys.executable, sys.executable, *sys.argv) - async def reindex(self, reason: Optional[Union[str, ReindexingReason]] = None) -> None: + async def reindex(self, reason: Optional[Union[str, ReindexingReason]] = None, **context) -> None: """Drop all tables or whole database and restart with the same CLI arguments""" - reason_str = reason.value if isinstance(reason, ReindexingReason) else 'unknown' - self.logger.warning('Reindexing initialized, reason: %s', reason_str) - - if not reason or isinstance(reason, str): + if not reason: + reason = ReindexingReason.MANUAL + elif isinstance(reason, str): + context['message'] = reason reason = ReindexingReason.MANUAL + reason_str = reason.value + f' ({context["message"]})' if "message" in context else '' + self.logger.warning('Reindexing initialized, reason: %s', reason_str) + self.logger.info('Additional context: %s', context) + if forbid_reindexing: schema = await Schema.filter().get() if schema.reindex: - raise ReindexingRequiredError(schema.reindex) + raise ReindexingRequiredError(schema.reindex, context) schema.reindex = reason await schema.save() - raise ReindexingRequiredError(schema.reindex) + raise ReindexingRequiredError(schema.reindex, context) database_config = self.config.database if isinstance(database_config, PostgresDatabaseConfig): diff --git a/src/dipdup/exceptions.py b/src/dipdup/exceptions.py index c11decf63..72c278201 100644 --- a/src/dipdup/exceptions.py +++ b/src/dipdup/exceptions.py @@ -1,8 +1,8 @@ import textwrap import traceback from contextlib import contextmanager -from dataclasses import dataclass -from typing import Any, Iterator, Optional, Type +from dataclasses import dataclass, field +from typing import Any, Dict, Iterator, Optional, Type import sentry_sdk from tabulate import tabulate @@ -137,6 +137,7 @@ class ReindexingRequiredError(DipDupError): """Unable to continue indexing with existing database""" reason: ReindexingReason + context: Dict[str, Any] = field(default_factory=dict) def _help(self) -> str: return f""" @@ -144,6 +145,8 @@ def _help(self) -> str: Reason: {self.reason.value} + Additional context: {self.context} + You may want to backup database before proceeding. After that perform one of the following actions: * Eliminate the cause of reindexing and run `UPDATE dupdup_schema SET reindex = NULL;` From 9f5c8805132e1ea0f8b83a7c10a80ec2cf9b2eb7 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sun, 3 Oct 2021 13:26:13 +0300 Subject: [PATCH 27/29] Fix tests, enqueue single level rollback --- src/dipdup/dipdup.py | 2 +- src/dipdup/index.py | 27 +++--- tests/integration_tests/test_rollback.py | 105 +++++++++++++++-------- 3 files changed, 85 insertions(+), 49 deletions(-) diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index 6cdf4f12f..fa265f1fd 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -192,7 +192,7 @@ async def _on_rollback(self, datasource: TzktDatasource, from_level: int, to_lev all_indexes_are_operation = len(matching_indexes) == len(matching_operation_indexes) if all_indexes_are_operation: for index in cast(List[OperationIndex], matching_indexes): - await index.single_level_rollback(from_level) + index.push_rollback(from_level) else: await self._ctx.fire_hook('on_rollback', datasource=datasource, from_level=from_level, to_level=to_level) diff --git a/src/dipdup/index.py b/src/dipdup/index.py index d8620002a..d657a4ecd 100644 --- a/src/dipdup/index.py +++ b/src/dipdup/index.py @@ -95,7 +95,9 @@ async def process(self) -> None: raise RuntimeError('Call `set_sync_level` before starting IndexDispatcher') elif self.state.level < self._datasource.sync_level: - self._logger.info('Index is behind datasource, sync to datasource level') + self._logger.info( + 'Index is behind datasource, sync to datasource level: %s -> %s', self.state.level, self._datasource.sync_level + ) self._queue.clear() last_level = self._datasource.sync_level await self._synchronize(last_level) @@ -154,7 +156,7 @@ def push_operations(self, operations: Tuple[OperationData, ...]) -> None: def push_rollback(self, level: int) -> None: self._queue.append(SingleLevelRollback(level)) - async def single_level_rollback(self, level: int) -> None: + async def _single_level_rollback(self, level: int) -> None: """Ensure next arrived block has all operations of the previous block. But it could also contain additional operations. Called by IndexDispatcher when index datasource receive a single level rollback. @@ -173,13 +175,13 @@ async def single_level_rollback(self, level: int) -> None: async def _process_queue(self) -> None: """Process WebSocket queue""" - if self._queue: - self._logger.info('Processing websocket queue') while self._queue: message = self._queue.popleft() if isinstance(message, SingleLevelRollback): - await self.single_level_rollback(message.level) + self._logger.info('Processing rollback realtime message, %s left in queue', len(self._queue)) + await self._single_level_rollback(message.level) else: + self._logger.info('Processing operations realtime message, %s left in queue', len(self._queue)) await self._process_level_operations(message) async def _synchronize(self, last_level: int, cache: bool = False) -> None: @@ -215,6 +217,8 @@ async def _synchronize(self, last_level: int, cache: bool = False) -> None: await self._exit_sync_state(last_level) async def _process_level_operations(self, operations: Tuple[OperationData, ...]) -> None: + if not operations: + return level = self._extract_level(operations) if self._rollback_level: @@ -229,16 +233,17 @@ async def _process_level_operations(self, operations: Tuple[OperationData, ...]) self._logger.info('Rolling back to previous level, verifying processed operations') expected_hashes = set(self._head_hashes) - received_hashes = set([op.hash for op in operations]) - reused_hashes = received_hashes & expected_hashes - if reused_hashes != expected_hashes: + received_hashes = set(op.hash for op in operations) + new_hashes = received_hashes - expected_hashes + missing_hashes = expected_hashes - received_hashes + + self._logger.info('Comparing hashes: %s new, %s missing', len(new_hashes), len(missing_hashes)) + if missing_hashes: + self._logger.info('Some operations are backtracked: %s', ', '.join(missing_hashes)) await self._ctx.reindex(ReindexingReason.ROLLBACK) self._rollback_level = None self._head_hashes = set() - new_hashes = received_hashes - expected_hashes - if not new_hashes: - return operations = tuple(op for op in operations if op.hash in new_hashes) # NOTE: le operator because it could be a single level rollback diff --git a/tests/integration_tests/test_rollback.py b/tests/integration_tests/test_rollback.py index 887c1aa92..f0cf18e8f 100644 --- a/tests/integration_tests/test_rollback.py +++ b/tests/integration_tests/test_rollback.py @@ -1,18 +1,20 @@ import asyncio +from contextlib import ExitStack, contextmanager from datetime import datetime from os.path import dirname, join -from typing import Tuple -from unittest import IsolatedAsyncioTestCase, skip -from unittest.mock import AsyncMock, patch +from typing import Generator, Tuple +from unittest import IsolatedAsyncioTestCase +from unittest.mock import AsyncMock, Mock, patch from dipdup.config import DipDupConfig from dipdup.datasources.tzkt.datasource import TzktDatasource from dipdup.dipdup import DipDup +from dipdup.models import HeadBlockData from dipdup.models import Index as State from dipdup.models import OperationData # import logging -# logging.basicConfig(level=logging.DEBUG) +# logging.basicConfig(level=logging.INFO) def _get_operation(hash_: str, level: int) -> OperationData: @@ -35,23 +37,24 @@ def _get_operation(hash_: str, level: int) -> OperationData: initial_level = 1365000 +next_level = initial_level + 1 exact_operations = ( - _get_operation('1', 1365001), - _get_operation('2', 1365001), - _get_operation('3', 1365001), + _get_operation('1', next_level), + _get_operation('2', next_level), + _get_operation('3', next_level), ) less_operations = ( - _get_operation('1', 1365001), - _get_operation('2', 1365001), + _get_operation('1', next_level), + _get_operation('2', next_level), ) more_operations = ( - _get_operation('1', 1365001), - _get_operation('2', 1365001), - _get_operation('3', 1365001), - _get_operation('4', 1365001), + _get_operation('1', next_level), + _get_operation('2', next_level), + _get_operation('3', next_level), + _get_operation('4', next_level), ) @@ -68,11 +71,14 @@ async def emit_messages( ): await self.emit_operations(old_block) await self.emit_rollback( - from_level=1365001, - to_level=1365001 - level, + from_level=next_level, + to_level=next_level - level, ) await self.emit_operations(new_block) + for _ in range(10): + await asyncio.sleep(0.1) + raise asyncio.CancelledError @@ -101,35 +107,60 @@ async def datasource_run_deep(self: TzktDatasource): await check_level(initial_level + 1) -class RollbackTest(IsolatedAsyncioTestCase): - async def asyncSetUp(self) -> None: - self.config = DipDupConfig.load([join(dirname(__file__), 'hic_et_nunc.yml')]) - self.config.database.path = ':memory:' # type: ignore - self.config.indexes['hen_mainnet'].last_level = self.config.indexes['hen_mainnet'].first_level + 1 # type: ignore - self.config.initialize() - self.dipdup = DipDup(self.config) +head = Mock(spec=HeadBlockData) +head.level = initial_level + + +@contextmanager +def patch_dipdup(datasource_run) -> Generator: + with ExitStack() as stack: + stack.enter_context(patch('dipdup.index.OperationIndex._synchronize', AsyncMock())) + stack.enter_context(patch('dipdup.datasources.tzkt.datasource.TzktDatasource.run', datasource_run)) + stack.enter_context(patch('dipdup.context.DipDupContext.reindex', AsyncMock())) + stack.enter_context(patch('dipdup.datasources.tzkt.datasource.TzktDatasource.get_head_block', AsyncMock(return_value=head))) + yield + + +def get_dipdup() -> DipDup: + config = DipDupConfig.load([join(dirname(__file__), 'hic_et_nunc.yml')]) + config.database.path = ':memory:' # type: ignore + config.indexes['hen_mainnet'].last_level = 0 # type: ignore + config.initialize() + return DipDup(config) + +class RollbackTest(IsolatedAsyncioTestCase): async def test_rollback_exact(self): - with patch('dipdup.datasources.tzkt.datasource.TzktDatasource.run', datasource_run_exact): - await self.dipdup.run(False, False, False) + with patch_dipdup(datasource_run_exact): + dipdup = get_dipdup() + await dipdup.run(False, False, False) + + assert dipdup._ctx.reindex.call_count == 0 async def test_rollback_more(self): - with patch('dipdup.datasources.tzkt.datasource.TzktDatasource.run', datasource_run_more): - await self.dipdup.run(False, False, False) + with patch_dipdup(datasource_run_more): + dipdup = get_dipdup() + await dipdup.run(False, False, False) + + assert dipdup._ctx.reindex.call_count == 0 - @skip('FIXME') async def test_rollback_less(self): - with patch('dipdup.datasources.tzkt.datasource.TzktDatasource.run', datasource_run_less): - with patch('dipdup.context.DipDupContext.reindex', AsyncMock()) as reindex_mock: - await self.dipdup.run(False, False, False) - assert reindex_mock.call_count == 1 + with patch_dipdup(datasource_run_less): + dipdup = get_dipdup() + await dipdup.run(False, False, False) + + assert dipdup._ctx.reindex.call_count == 1 async def test_rollback_zero(self): - with patch('dipdup.datasources.tzkt.datasource.TzktDatasource.run', datasource_run_zero): - await self.dipdup.run(False, False, False) + with patch_dipdup(datasource_run_zero): + dipdup = get_dipdup() + await dipdup.run(False, False, False) + + assert dipdup._ctx.reindex.call_count == 0 async def test_rollback_deep(self): - with patch('dipdup.datasources.tzkt.datasource.TzktDatasource.run', datasource_run_deep): - with patch('dipdup.context.DipDupContext.reindex', AsyncMock()) as reindex_mock: - await self.dipdup.run(False, False, False) - assert reindex_mock.call_count == 1 + with patch_dipdup(datasource_run_deep): + dipdup = get_dipdup() + await dipdup.run(False, False, False) + + assert dipdup._ctx.reindex.call_count == 1 From df988cd4aac172acfd9316db171f4541ae68398f Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sun, 3 Oct 2021 17:29:07 +0300 Subject: [PATCH 28/29] InitializationRequiredError on create_package failure --- src/dipdup/cli.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/dipdup/cli.py b/src/dipdup/cli.py index c45ec73e7..63eba3710 100644 --- a/src/dipdup/cli.py +++ b/src/dipdup/cli.py @@ -20,7 +20,7 @@ from dipdup.codegen import DEFAULT_DOCKER_ENV_FILE, DEFAULT_DOCKER_IMAGE, DEFAULT_DOCKER_TAG, DipDupCodeGenerator from dipdup.config import DipDupConfig, LoggingConfig, PostgresDatabaseConfig from dipdup.dipdup import DipDup -from dipdup.exceptions import ConfigurationError, DeprecatedHandlerError, DipDupError, MigrationRequiredError +from dipdup.exceptions import ConfigurationError, DeprecatedHandlerError, DipDupError, InitializationRequiredError, MigrationRequiredError from dipdup.hasura import HasuraGateway from dipdup.migrations import DipDupMigrationManager, deprecated_handlers from dipdup.utils.database import set_decimal_context, tortoise_wrapper @@ -113,8 +113,10 @@ async def cli(ctx, config: List[str], env_file: List[str], logging_config: str): _config = DipDupConfig.load(config) init_sentry(_config) - # TODO: Raise InitRequiredError on failure - await DipDupCodeGenerator(_config, {}).create_package() + try: + await DipDupCodeGenerator(_config, {}).create_package() + except Exception as e: + raise InitializationRequiredError from e if _config.spec_version not in spec_version_mapping: raise ConfigurationError(f'Unknown `spec_version`, correct ones: {", ".join(spec_version_mapping)}') From bcfb62be4208905a4780a3f9d8eb3890c68f23d2 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Mon, 4 Oct 2021 10:38:51 +0300 Subject: [PATCH 29/29] Final touches, changelog --- CHANGELOG.md | 12 ++++++++---- src/dipdup/datasources/tzkt/datasource.py | 10 +++++----- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fde07aec9..bb9deffb6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,13 +4,17 @@ ### Improved -* Improved indexing performance. +* A significant increase in indexing speed. ### Fixed -* Fixed processing zero- and single-level rollbacks. -* Removed unnecessary file IO calls causing `PermissionError` exception in Docker environment. -* Fixed possible atomicity violation during real-time indexing. +* Fixed unexpected reindexing caused by the bug in processing zero- and single-level rollbacks. +* Removed unnecessary file IO calls that could cause `PermissionError` exception in Docker environments. +* Fixed possible violation of block-level atomicity during real-time indexing. + +### Changes + +* Public methods of `TzktDatasource` now return immutable sequences. ## 3.0.3 - 2021-10-01 diff --git a/src/dipdup/datasources/tzkt/datasource.py b/src/dipdup/datasources/tzkt/datasource.py index baee79ee9..a897a6f04 100644 --- a/src/dipdup/datasources/tzkt/datasource.py +++ b/src/dipdup/datasources/tzkt/datasource.py @@ -228,7 +228,7 @@ async def fetch_big_maps_by_level(self) -> AsyncGenerator[Tuple[int, Tuple[BigMa for i in range(len(big_maps) - 1): if big_maps[i].level != big_maps[i + 1].level: yield big_maps[i].level, tuple(big_maps[: i + 1]) - big_maps = big_maps[i + 1 :] # noqa: E203 + big_maps = big_maps[i + 1 :] break else: break @@ -285,7 +285,7 @@ def request_limit(self) -> int: def sync_level(self) -> Optional[int]: return self._sync_level - async def get_similar_contracts(self, address: str, strict: bool = False) -> List[str]: + async def get_similar_contracts(self, address: str, strict: bool = False) -> Tuple[str, ...]: """Get list of contracts sharing the same code hash or type hash""" entrypoint = 'same' if strict else 'similar' self._logger.info('Fetching %s contracts for address `%s', entrypoint, address) @@ -298,9 +298,9 @@ async def get_similar_contracts(self, address: str, strict: bool = False) -> Lis limit=self.request_limit, ), ) - return contracts + return tuple(c for c in contracts) - async def get_originated_contracts(self, address: str) -> List[str]: + async def get_originated_contracts(self, address: str) -> Tuple[str, ...]: """Get contracts originated from given address""" self._logger.info('Fetching originated contracts for address `%s', address) contracts = await self._http.request( @@ -310,7 +310,7 @@ async def get_originated_contracts(self, address: str) -> List[str]: limit=self.request_limit, ), ) - return [c['address'] for c in contracts] + return tuple(c['address'] for c in contracts) async def get_contract_summary(self, address: str) -> Dict[str, Any]: """Get contract summary"""