From d2f38b0f72fb5b12023abc6ef5bb607b15ca76bc Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Wed, 14 Jul 2021 10:26:53 +0300 Subject: [PATCH 01/10] Revert "Use single postgres connection, enable parallel index processing (#77)" This reverts commit 144db3204841d2470dda5831f7fdd5218ddd6bd0. --- src/dipdup/dipdup.py | 4 +++- src/dipdup/exceptions.py | 8 +++----- src/dipdup/index.py | 9 +++------ 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index 4c576363b..06f745f10 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -143,8 +143,10 @@ async def run(self, oneshot=False) -> None: while not self._stopped: await self.reload_config() + # FIXME: Process all indexes in parallel, blocked by https://github.com/tortoise/tortoise-orm/issues/792 async with utils.slowdown(INDEX_DISPATCHER_INTERVAL): - await asyncio.gather(*[index.process() for index in self._indexes.values()]) + for index in self._indexes.values(): + await index.process() # TODO: Continue if new indexes are spawned from origination if oneshot: diff --git a/src/dipdup/exceptions.py b/src/dipdup/exceptions.py index da1d7e801..ae4bfcc6d 100644 --- a/src/dipdup/exceptions.py +++ b/src/dipdup/exceptions.py @@ -1,7 +1,7 @@ import traceback from abc import ABC, abstractmethod -from typing import Optional, Type, Any from pprint import pformat +from typing import Any, Optional, Type from tabulate import tabulate @@ -144,7 +144,7 @@ def format_help(self) -> str: class InvalidDataError(DipDupError): """Failed to validate operation/big_map data against a generated type class""" - def __init__(self, data: Any, type_cls: Type, error_context : Optional[Any] = None) -> None: + def __init__(self, data: Any, type_cls: Type, error_context: Optional[Any] = None) -> None: super().__init__(None) self.data = data self.type_name = type_cls.__name__ @@ -152,7 +152,5 @@ def __init__(self, data: Any, type_cls: Type, error_context : Optional[Any] = No def format_help(self) -> str: return _data_validation_error.format( - invalid_data=pformat(self.data, compact=True), - type_name=self.type_name, - error_context=pformat(self.error_context, compact=True) + invalid_data=pformat(self.data, compact=True), type_name=self.type_name, error_context=pformat(self.error_context, compact=True) ) diff --git a/src/dipdup/index.py b/src/dipdup/index.py index 1bdee86ca..54cdfea44 100644 --- a/src/dipdup/index.py +++ b/src/dipdup/index.py @@ -3,6 +3,7 @@ from collections import defaultdict, deque, namedtuple from contextlib import suppress from typing import Deque, Dict, List, Optional, Set, Tuple, Union, cast + from pydantic.error_wrappers import ValidationError from dipdup.config import ( @@ -19,9 +20,9 @@ ) from dipdup.context import DipDupContext, HandlerContext from dipdup.datasources.tzkt.datasource import BigMapFetcher, OperationFetcher, TzktDatasource +from dipdup.exceptions import InvalidDataError from dipdup.models import BigMapData, BigMapDiff, HeadBlockData, OperationData, Origination, State, TemporaryState, Transaction from dipdup.utils import FormattedLogger, in_global_transaction -from dipdup.exceptions import InvalidDataError # NOTE: Operations of a single contract call OperationSubgroup = namedtuple('OperationSubgroup', ('hash', 'counter')) @@ -297,11 +298,7 @@ async def _on_match( try: parameter = parameter_type.parse_obj(operation.parameter_json) if parameter_type else None except ValidationError as e: - error_context = dict( - hash=operation.hash, - counter=operation.counter, - nonce=operation.nonce - ) + error_context = dict(hash=operation.hash, counter=operation.counter, nonce=operation.nonce) raise InvalidDataError(operation.parameter_json, parameter_type, error_context) from e storage_type = pattern_config.storage_type_cls From bfee3f2104b22914f348fc1c008940a862754207 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Wed, 14 Jul 2021 10:53:10 +0300 Subject: [PATCH 02/10] Add environment option --- src/dipdup/cli.py | 1 + src/dipdup/config.py | 1 + 2 files changed, 2 insertions(+) diff --git a/src/dipdup/cli.py b/src/dipdup/cli.py index 177f14d0e..b750d95b7 100644 --- a/src/dipdup/cli.py +++ b/src/dipdup/cli.py @@ -66,6 +66,7 @@ async def cli(ctx, config: List[str], logging_config: str): if _config.sentry: sentry_sdk.init( dsn=_config.sentry.dsn, + environment=_config.sentry.environment, integrations=[AioHttpIntegration()], ) diff --git a/src/dipdup/config.py b/src/dipdup/config.py index 85db79993..2b740e189 100644 --- a/src/dipdup/config.py +++ b/src/dipdup/config.py @@ -674,6 +674,7 @@ class JobConfig(HandlerConfig): @dataclass class SentryConfig: dsn: str + environment: Optional[str] = None @dataclass From 5a142b0c0e25c8da0ae9e362d16bec0e7bc53e61 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Wed, 14 Jul 2021 11:14:37 +0300 Subject: [PATCH 03/10] `interval` job config --- src/dipdup/config.py | 8 +++++++- src/dipdup/scheduler.py | 6 +++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/dipdup/config.py b/src/dipdup/config.py index 2b740e189..65bb92679 100644 --- a/src/dipdup/config.py +++ b/src/dipdup/config.py @@ -666,10 +666,16 @@ def headers(self) -> Dict[str, str]: @dataclass class JobConfig(HandlerConfig): - crontab: str + crontab: Optional[str] = None + interval: Optional[int] = None args: Optional[Dict[str, Any]] = None atomic: bool = False + def __post_init_post_parse__(self): + if int(bool(self.crontab)) + int(bool(self.interval)) != 1: + raise ConfigurationError('Either `interval` or `crontab` field must be specified') + super().__post_init_post_parse__() + @dataclass class SentryConfig: diff --git a/src/dipdup/scheduler.py b/src/dipdup/scheduler.py index 91ca4062c..54479e2ae 100644 --- a/src/dipdup/scheduler.py +++ b/src/dipdup/scheduler.py @@ -2,6 +2,7 @@ from apscheduler.jobstores.memory import MemoryJobStore # type: ignore from apscheduler.schedulers.asyncio import AsyncIOScheduler # type: ignore from apscheduler.triggers.cron import CronTrigger # type: ignore +from apscheduler.triggers.interval import IntervalTrigger # type: ignore from pytz import utc from dipdup.config import JobConfig @@ -34,7 +35,10 @@ async def _atomic_wrapper(ctx, args): async with in_global_transaction(): await job_config.callback_fn(ctx, args) - trigger = CronTrigger.from_crontab(job_config.crontab) + if job_config.crontab: + trigger = CronTrigger.from_crontab(job_config.crontab) + elif job_config.interval: + trigger = IntervalTrigger(seconds=job_config.interval) scheduler.add_job( func=_atomic_wrapper if job_config.atomic else job_config.callback_fn, id=job_name, From d076d922465c7f42ec632436542687e23eaf7d34 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Wed, 14 Jul 2021 13:07:04 +0300 Subject: [PATCH 04/10] Update tortoise --- poetry.lock | 45 ++++++++++++++++++++++++++++++--------------- pyproject.toml | 2 +- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/poetry.lock b/poetry.lock index 004fd582b..c9a4c0a0c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -186,6 +186,17 @@ category = "main" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" +[[package]] +name = "charset-normalizer" +version = "2.0.1" +description = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet." +category = "main" +optional = false +python-versions = ">=3.5.0" + +[package.extras] +unicode_backport = ["unicodedata2"] + [[package]] name = "click" version = "8.0.1" @@ -353,11 +364,11 @@ python-versions = "*" [[package]] name = "idna" -version = "2.10" +version = "3.2" description = "Internationalized Domain Names in Applications (IDNA)" category = "main" optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" +python-versions = ">=3.5" [[package]] name = "inflect" @@ -738,21 +749,21 @@ python-versions = "*" [[package]] name = "requests" -version = "2.25.1" +version = "2.26.0" description = "Python HTTP for Humans." category = "main" optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*" [package.dependencies] certifi = ">=2017.4.17" -chardet = ">=3.0.2,<5" -idna = ">=2.5,<3" +charset-normalizer = {version = ">=2.0.0,<2.1.0", markers = "python_version >= \"3\""} +idna = {version = ">=2.5,<4", markers = "python_version >= \"3\""} urllib3 = ">=1.21.1,<1.27" [package.extras] -security = ["pyOpenSSL (>=0.14)", "cryptography (>=1.3.4)"] socks = ["PySocks (>=1.5.6,!=1.5.7)", "win-inet-pton"] +use_chardet_on_py3 = ["chardet (>=3.0.2,<5)"] [[package]] name = "ruamel.yaml" @@ -843,7 +854,7 @@ python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" [[package]] name = "tortoise-orm" -version = "0.17.4" +version = "0.17.5" description = "Easy async ORM for python, built with relations in mind" category = "main" optional = false @@ -925,7 +936,7 @@ multidict = ">=4.0" [metadata] lock-version = "1.1" python-versions = "^3.8" -content-hash = "86526b8683ffa2171af38d8b8ebae7cf8c05554654cc7aa8a16e62b1c5731d01" +content-hash = "6c5e0448688276ae10827af7685494d1e908f11b71c8a50ccd2fb334f62ae939" [metadata.files] aiohttp = [ @@ -1035,6 +1046,10 @@ chardet = [ {file = "chardet-4.0.0-py2.py3-none-any.whl", hash = "sha256:f864054d66fd9118f2e67044ac8981a54775ec5b67aed0441892edb553d21da5"}, {file = "chardet-4.0.0.tar.gz", hash = "sha256:0d6f53a15db4120f2b08c94f11e7d93d2c911ee118b6b30a04ec3ee8310179fa"}, ] +charset-normalizer = [ + {file = "charset-normalizer-2.0.1.tar.gz", hash = "sha256:ad0da505736fc7e716a8da15bf19a985db21ac6415c26b34d2fafd3beb3d927e"}, + {file = "charset_normalizer-2.0.1-py3-none-any.whl", hash = "sha256:b68b38179052975093d71c1b5361bf64afd80484697c1f27056e50593e695ceb"}, +] click = [ {file = "click-8.0.1-py3-none-any.whl", hash = "sha256:fba402a4a47334742d782209a7c79bc448911afe1149d07bdabdf480b3e2f4b6"}, {file = "click-8.0.1.tar.gz", hash = "sha256:8c04c11192119b1ef78ea049e0a6f0463e4c48ef00a30160c704337586f3ad7a"}, @@ -1133,8 +1148,8 @@ genson = [ {file = "genson-1.2.2.tar.gz", hash = "sha256:8caf69aa10af7aee0e1a1351d1d06801f4696e005f06cedef438635384346a16"}, ] idna = [ - {file = "idna-2.10-py2.py3-none-any.whl", hash = "sha256:b97d804b1e9b523befed77c48dacec60e6dcb0b5391d57af6a65a312a90648c0"}, - {file = "idna-2.10.tar.gz", hash = "sha256:b307872f855b18632ce0c21c5e45be78c0ea7ae4c15c828c20788b26921eb3f6"}, + {file = "idna-3.2-py3-none-any.whl", hash = "sha256:14475042e284991034cb48e06f6851428fb14c4dc953acd9be9a5e95c7b6dd7a"}, + {file = "idna-3.2.tar.gz", hash = "sha256:467fbad99067910785144ce333826c71fb0e63a425657295239737f7ecd125f3"}, ] inflect = [ {file = "inflect-5.3.0-py3-none-any.whl", hash = "sha256:42560be16af702a21d43d59427f276b5aed79efb1ded9b713468c081f4353d10"}, @@ -1493,8 +1508,8 @@ regex = [ {file = "regex-2021.7.6.tar.gz", hash = "sha256:8394e266005f2d8c6f0bc6780001f7afa3ef81a7a2111fa35058ded6fce79e4d"}, ] requests = [ - {file = "requests-2.25.1-py2.py3-none-any.whl", hash = "sha256:c210084e36a42ae6b9219e00e48287def368a26d03a048ddad7bfee44f75871e"}, - {file = "requests-2.25.1.tar.gz", hash = "sha256:27973dd4a904a4f13b263a19c866c13b92a39ed1c964655f025f3f8d3d75b804"}, + {file = "requests-2.26.0-py2.py3-none-any.whl", hash = "sha256:6c1246513ecd5ecd4528a0906f910e8f0f9c6b8ec72030dc9fd154dc1a6efd24"}, + {file = "requests-2.26.0.tar.gz", hash = "sha256:b8aa58f8cf793ffd8782d3d8cb19e66ef36f7aba4353eec859e74678b01b07a7"}, ] "ruamel.yaml" = [ {file = "ruamel.yaml-0.17.10-py3-none-any.whl", hash = "sha256:ffb9b703853e9e8b7861606dfdab1026cf02505bade0653d1880f4b2db47f815"}, @@ -1544,8 +1559,8 @@ toml = [ {file = "toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"}, ] tortoise-orm = [ - {file = "tortoise-orm-0.17.4.tar.gz", hash = "sha256:8314a9ae63d3f009bac5da3e7d1f7e3f2de8f9bad43ce1efcd3e059209cd3f9d"}, - {file = "tortoise_orm-0.17.4-py3-none-any.whl", hash = "sha256:f052b6089e30748afec88669f1a1cf01a3662cdac81cf5427dfb338839ad6027"}, + {file = "tortoise-orm-0.17.5.tar.gz", hash = "sha256:65a930e6e6050866dc18a7d251a77a6dd2616e814da3ede8bda990147fa6b7d5"}, + {file = "tortoise_orm-0.17.5-py3-none-any.whl", hash = "sha256:978ec824837b44373fb1b3669d443d823c71b080e39db37db72355fde6cadc24"}, ] typed-ast = [ {file = "typed_ast-1.4.3-cp35-cp35m-manylinux1_i686.whl", hash = "sha256:2068531575a125b87a41802130fa7e29f26c09a2833fea68d9a40cf33902eba6"}, diff --git a/pyproject.toml b/pyproject.toml index 847927bd7..adb2470a1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,7 +26,7 @@ aiohttp = "^3.7.4" asyncpg = "0.23.0" datamodel-code-generator = "^0.11.1" "ruamel.yaml" = "^0.17.2" -tortoise-orm = "0.17.4" +tortoise-orm = "0.17.5" pydantic = "^1.8.1" aiosignalrcore = "^0.9.2" fcache = "^0.4.7" From 411607949227ea41d4365a55db55310e3db1ac08 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Wed, 14 Jul 2021 13:07:15 +0300 Subject: [PATCH 05/10] Revert "Revert "Use single postgres connection, enable parallel index processing (#77)"" This reverts commit d2f38b0f72fb5b12023abc6ef5bb607b15ca76bc. --- src/dipdup/dipdup.py | 4 +--- src/dipdup/exceptions.py | 8 +++++--- src/dipdup/index.py | 9 ++++++--- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index 06f745f10..4c576363b 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -143,10 +143,8 @@ async def run(self, oneshot=False) -> None: while not self._stopped: await self.reload_config() - # FIXME: Process all indexes in parallel, blocked by https://github.com/tortoise/tortoise-orm/issues/792 async with utils.slowdown(INDEX_DISPATCHER_INTERVAL): - for index in self._indexes.values(): - await index.process() + await asyncio.gather(*[index.process() for index in self._indexes.values()]) # TODO: Continue if new indexes are spawned from origination if oneshot: diff --git a/src/dipdup/exceptions.py b/src/dipdup/exceptions.py index ae4bfcc6d..da1d7e801 100644 --- a/src/dipdup/exceptions.py +++ b/src/dipdup/exceptions.py @@ -1,7 +1,7 @@ import traceback from abc import ABC, abstractmethod +from typing import Optional, Type, Any from pprint import pformat -from typing import Any, Optional, Type from tabulate import tabulate @@ -144,7 +144,7 @@ def format_help(self) -> str: class InvalidDataError(DipDupError): """Failed to validate operation/big_map data against a generated type class""" - def __init__(self, data: Any, type_cls: Type, error_context: Optional[Any] = None) -> None: + def __init__(self, data: Any, type_cls: Type, error_context : Optional[Any] = None) -> None: super().__init__(None) self.data = data self.type_name = type_cls.__name__ @@ -152,5 +152,7 @@ def __init__(self, data: Any, type_cls: Type, error_context: Optional[Any] = Non def format_help(self) -> str: return _data_validation_error.format( - invalid_data=pformat(self.data, compact=True), type_name=self.type_name, error_context=pformat(self.error_context, compact=True) + invalid_data=pformat(self.data, compact=True), + type_name=self.type_name, + error_context=pformat(self.error_context, compact=True) ) diff --git a/src/dipdup/index.py b/src/dipdup/index.py index 54cdfea44..1bdee86ca 100644 --- a/src/dipdup/index.py +++ b/src/dipdup/index.py @@ -3,7 +3,6 @@ from collections import defaultdict, deque, namedtuple from contextlib import suppress from typing import Deque, Dict, List, Optional, Set, Tuple, Union, cast - from pydantic.error_wrappers import ValidationError from dipdup.config import ( @@ -20,9 +19,9 @@ ) from dipdup.context import DipDupContext, HandlerContext from dipdup.datasources.tzkt.datasource import BigMapFetcher, OperationFetcher, TzktDatasource -from dipdup.exceptions import InvalidDataError from dipdup.models import BigMapData, BigMapDiff, HeadBlockData, OperationData, Origination, State, TemporaryState, Transaction from dipdup.utils import FormattedLogger, in_global_transaction +from dipdup.exceptions import InvalidDataError # NOTE: Operations of a single contract call OperationSubgroup = namedtuple('OperationSubgroup', ('hash', 'counter')) @@ -298,7 +297,11 @@ async def _on_match( try: parameter = parameter_type.parse_obj(operation.parameter_json) if parameter_type else None except ValidationError as e: - error_context = dict(hash=operation.hash, counter=operation.counter, nonce=operation.nonce) + error_context = dict( + hash=operation.hash, + counter=operation.counter, + nonce=operation.nonce + ) raise InvalidDataError(operation.parameter_json, parameter_type, error_context) from e storage_type = pattern_config.storage_type_cls From f66fd361fa739f622dd3543bbebfba2cb427f1fd Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Wed, 14 Jul 2021 13:49:45 +0300 Subject: [PATCH 06/10] Do not request genesis block --- src/dipdup/exceptions.py | 6 +++--- src/dipdup/index.py | 21 ++++++++++++--------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/src/dipdup/exceptions.py b/src/dipdup/exceptions.py index da1d7e801..ec1e9f407 100644 --- a/src/dipdup/exceptions.py +++ b/src/dipdup/exceptions.py @@ -1,7 +1,7 @@ import traceback from abc import ABC, abstractmethod -from typing import Optional, Type, Any from pprint import pformat +from typing import Any, Optional, Type from tabulate import tabulate @@ -144,7 +144,7 @@ def format_help(self) -> str: class InvalidDataError(DipDupError): """Failed to validate operation/big_map data against a generated type class""" - def __init__(self, data: Any, type_cls: Type, error_context : Optional[Any] = None) -> None: + def __init__(self, data: Any, type_cls: Type, error_context: Optional[Any] = None) -> None: super().__init__(None) self.data = data self.type_name = type_cls.__name__ @@ -154,5 +154,5 @@ def format_help(self) -> str: return _data_validation_error.format( invalid_data=pformat(self.data, compact=True), type_name=self.type_name, - error_context=pformat(self.error_context, compact=True) + error_context=pformat(self.error_context, compact=True), ) diff --git a/src/dipdup/index.py b/src/dipdup/index.py index 1bdee86ca..4bd21a585 100644 --- a/src/dipdup/index.py +++ b/src/dipdup/index.py @@ -3,6 +3,7 @@ from collections import defaultdict, deque, namedtuple from contextlib import suppress from typing import Deque, Dict, List, Optional, Set, Tuple, Union, cast + from pydantic.error_wrappers import ValidationError from dipdup.config import ( @@ -19,9 +20,9 @@ ) from dipdup.context import DipDupContext, HandlerContext from dipdup.datasources.tzkt.datasource import BigMapFetcher, OperationFetcher, TzktDatasource +from dipdup.exceptions import InvalidDataError from dipdup.models import BigMapData, BigMapDiff, HeadBlockData, OperationData, Origination, State, TemporaryState, Transaction from dipdup.utils import FormattedLogger, in_global_transaction -from dipdup.exceptions import InvalidDataError # NOTE: Operations of a single contract call OperationSubgroup = namedtuple('OperationSubgroup', ('hash', 'counter')) @@ -90,13 +91,15 @@ async def _initialize_index_state(self) -> None: self._logger.warning('Config hash mismatch (config has been changed), reindexing') await self._ctx.reindex() - block = await self._datasource.get_block(state.level) - if state.hash: - if state.hash != block.hash: - self._logger.warning('Block hash mismatch (missed rollback while dipdup was stopped), reindexing') - await self._ctx.reindex() - else: - state.hash = block.hash # type: ignore + # NOTE: No need to check genesis block + if state.level: + block = await self._datasource.get_block(state.level) + if state.hash: + if state.hash != block.hash: + self._logger.warning('Block hash mismatch (missed rollback while dipdup was stopped), reindexing') + await self._ctx.reindex() + else: + state.hash = block.hash # type: ignore await state.save() self._state = state @@ -300,7 +303,7 @@ async def _on_match( error_context = dict( hash=operation.hash, counter=operation.counter, - nonce=operation.nonce + nonce=operation.nonce, ) raise InvalidDataError(operation.parameter_json, parameter_type, error_context) from e From 78cffebec58d30d9672165ba580b7d1a2757cb9d Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Wed, 14 Jul 2021 15:54:29 +0300 Subject: [PATCH 07/10] Set decimal context --- src/dipdup/cli.py | 4 +++- src/dipdup/dipdup.py | 8 ++++---- src/dipdup/hasura.py | 34 ++++++---------------------------- src/dipdup/utils.py | 42 +++++++++++++++++++++++++++++++++++++++++- 4 files changed, 54 insertions(+), 34 deletions(-) diff --git a/src/dipdup/cli.py b/src/dipdup/cli.py index b750d95b7..dc36a5bba 100644 --- a/src/dipdup/cli.py +++ b/src/dipdup/cli.py @@ -17,7 +17,7 @@ from dipdup.dipdup import DipDup from dipdup.exceptions import ConfigurationError, DipDupError, MigrationRequiredError from dipdup.hasura import HasuraGateway -from dipdup.utils import tortoise_wrapper +from dipdup.utils import set_decimal_context, tortoise_wrapper _logger = logging.getLogger('dipdup.cli') @@ -70,6 +70,8 @@ async def cli(ctx, config: List[str], logging_config: str): integrations=[AioHttpIntegration()], ) + set_decimal_context(_config.package) + ctx.obj = CLIContext( config_paths=config, config=_config, diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index 4c576363b..92d7d7802 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -13,7 +13,6 @@ from tortoise.transactions import get_connection from tortoise.utils import get_schema_sql -import dipdup.utils as utils from dipdup.codegen import DipDupCodeGenerator from dipdup.config import ( ROLLBACK_HANDLER, @@ -38,6 +37,7 @@ from dipdup.hasura import HasuraGateway from dipdup.index import BigMapIndex, Index, OperationIndex from dipdup.models import BigMapData, HeadBlockData, IndexType, OperationData, State +from dipdup.utils import FormattedLogger, slowdown, tortoise_wrapper INDEX_DISPATCHER_INTERVAL = 1.0 from dipdup.scheduler import add_job, create_scheduler @@ -105,7 +105,7 @@ async def dispatch_big_maps(self, datasource: TzktDatasource, big_maps: List[Big index.push(level, big_maps) async def _rollback(self, datasource: TzktDatasource, from_level: int, to_level: int) -> None: - logger = utils.FormattedLogger(ROLLBACK_HANDLER) + logger = FormattedLogger(ROLLBACK_HANDLER) if from_level - to_level == 1: # NOTE: Single level rollbacks are processed at Index level. # NOTE: Notify all indexes with rolled back datasource to skip next level and just verify it @@ -143,7 +143,7 @@ async def run(self, oneshot=False) -> None: while not self._stopped: await self.reload_config() - async with utils.slowdown(INDEX_DISPATCHER_INTERVAL): + async with slowdown(INDEX_DISPATCHER_INTERVAL): await asyncio.gather(*[index.process() for index in self._indexes.values()]) # TODO: Continue if new indexes are spawned from origination @@ -193,7 +193,7 @@ async def run(self, reindex: bool, oneshot: bool) -> None: url = self._config.database.connection_string models = f'{self._config.package}.models' - async with utils.tortoise_wrapper(url, models): + async with tortoise_wrapper(url, models): await self._initialize_database(reindex) await self._create_datasources() await self._configure() diff --git a/src/dipdup/hasura.py b/src/dipdup/hasura.py index 95edf37fb..bc70ee851 100644 --- a/src/dipdup/hasura.py +++ b/src/dipdup/hasura.py @@ -4,19 +4,19 @@ from contextlib import suppress from os import listdir from os.path import dirname, join -from types import ModuleType -from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Type +from typing import Any, Dict, Iterator, List, Optional, Tuple import humps # type: ignore from aiohttp import ClientConnectorError, ClientOSError from genericpath import exists from pydantic.dataclasses import dataclass -from tortoise import Model, fields +from tortoise import fields from tortoise.transactions import get_connection from dipdup.config import HasuraConfig, HTTPConfig, PostgresDatabaseConfig, pascal_to_snake from dipdup.exceptions import ConfigurationError from dipdup.http import HTTPGateway +from dipdup.utils import iter_models @dataclass @@ -31,21 +31,6 @@ def camelize(self) -> 'Field': ) -def _is_model_class(obj: Any) -> bool: - """Is subclass of tortoise.Model, but not the base class""" - return isinstance(obj, type) and issubclass(obj, Model) and obj != Model and not getattr(obj.Meta, 'abstract', False) - - -def _iter_models(modules: Iterable[ModuleType]) -> Iterator[Tuple[str, Type[Model]]]: - """Iterate over built-in and project's models""" - for models in modules: - for attr in dir(models): - model = getattr(models, attr) - if _is_model_class(model): - app = 'int_models' if models.__name__ == 'dipdup.models' else 'models' - yield app, model - - class HasuraError(RuntimeError): ... @@ -197,11 +182,6 @@ async def _get_views(self) -> List[str]: )[1] ] - def _get_model_modules(self) -> Tuple[ModuleType, ModuleType]: - int_models = importlib.import_module('dipdup.models') - models = importlib.import_module(f'{self._package}.models') - return int_models, models - async def _generate_source_tables_metadata(self) -> List[Dict[str, Any]]: """Generate source tables metadata based on project models and views. @@ -213,9 +193,8 @@ async def _generate_source_tables_metadata(self) -> List[Dict[str, Any]]: metadata_tables = {} model_tables = {} - model_modules = self._get_model_modules() - for app, model in _iter_models(model_modules): + for app, model in iter_models(self._package): table_name = model._meta.db_table or pascal_to_snake(model.__name__) model_tables[f'{app}.{model.__name__}'] = table_name metadata_tables[table_name] = self._format_table(table_name) @@ -223,7 +202,7 @@ async def _generate_source_tables_metadata(self) -> List[Dict[str, Any]]: for view in views: metadata_tables[view] = self._format_table(view) - for app, model in _iter_models(model_modules): + for app, model in iter_models(self._package): table_name = model_tables[f'{app}.{model.__name__}'] for field in model._meta.fields_map.values(): @@ -262,8 +241,7 @@ def _iterate_graphql_queries(self) -> Iterator[Tuple[str, str]]: async def _generate_query_collections_metadata(self) -> List[Dict[str, Any]]: queries = [] - model_modules = self._get_model_modules() - for _, model in _iter_models(model_modules): + for _, model in iter_models(self._package): table_name = model._meta.db_table or pascal_to_snake(model.__name__) for field_name, field in model._meta.fields_map.items(): diff --git a/src/dipdup/utils.py b/src/dipdup/utils.py index e3f5659a1..58370440c 100644 --- a/src/dipdup/utils.py +++ b/src/dipdup/utils.py @@ -1,15 +1,20 @@ import asyncio +import decimal +import importlib import logging import re import time from contextlib import asynccontextmanager from logging import Logger -from typing import Any, AsyncIterator, Iterator, List, Optional +from types import ModuleType +from typing import Any, AsyncIterator, Iterable, Iterator, List, Optional, Tuple, Type from tortoise import Tortoise from tortoise.backends.asyncpg.client import AsyncpgDBClient from tortoise.backends.base.client import TransactionContext from tortoise.backends.sqlite.client import SqliteClient +from tortoise.fields import DecimalField +from tortoise.models import Model from tortoise.transactions import in_transaction _logger = logging.getLogger('dipdup.utils') @@ -98,6 +103,41 @@ async def in_global_transaction(): Tortoise._connections['default'] = original_conn +def is_model_class(obj: Any) -> bool: + """Is subclass of tortoise.Model, but not the base class""" + return isinstance(obj, type) and issubclass(obj, Model) and obj != Model and not getattr(obj.Meta, 'abstract', False) + + +# TODO: Cache me +def iter_models(package: str) -> Iterator[Tuple[str, Type[Model]]]: + """Iterate over built-in and project's models""" + dipdup_models = importlib.import_module('dipdup.models') + package_models = importlib.import_module(f'{package}.models') + + for models in (dipdup_models, package_models): + for attr in dir(models): + model = getattr(models, attr) + if is_model_class(model): + app = 'int_models' if models.__name__ == 'dipdup.models' else 'models' + yield app, model + + +def set_decimal_context(package: str) -> None: + context = decimal.getcontext() + prec = context.prec + for _, model in iter_models(package): + for field in model._meta.fields_map.values(): + if isinstance(field, DecimalField): + context.prec = max(context.prec, field.max_digits + field.max_digits) + if prec < context.prec: + _logger.warning( + 'Decimal context precision has been updated: %s -> %s', prec, context.prec + ) + # NOTE: DefaultContext used for new threads + decimal.DefaultContext.prec = context.prec + decimal.setcontext(context) + + class FormattedLogger(Logger): def __init__( self, From 9980ec24ad620245b6621b3b8af179dfff268bee Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Wed, 14 Jul 2021 16:35:12 +0300 Subject: [PATCH 08/10] Fix graceful shutdown --- src/dipdup/cli.py | 8 ++-- src/dipdup/config.py | 5 +- src/dipdup/dipdup.py | 66 +++++++++++++------------- src/dipdup/http.py | 22 ++++++++- src/dipdup/utils.py | 7 +-- tests/integration_tests/test_hasura.py | 1 - 6 files changed, 64 insertions(+), 45 deletions(-) diff --git a/src/dipdup/cli.py b/src/dipdup/cli.py index dc36a5bba..f32148b85 100644 --- a/src/dipdup/cli.py +++ b/src/dipdup/cli.py @@ -146,10 +146,8 @@ async def configure_hasura(ctx, reset: bool): if not config.hasura: _logger.error('`hasura` config section is empty') return - hasura = HasuraGateway(config.package, config.hasura, cast(PostgresDatabaseConfig, config.database)) + hasura_gateway = HasuraGateway(config.package, config.hasura, cast(PostgresDatabaseConfig, config.database)) async with tortoise_wrapper(url, models): - try: - await hasura.configure(reset) - finally: - await hasura.close_session() + async with hasura_gateway: + await hasura_gateway.configure(reset) diff --git a/src/dipdup/config.py b/src/dipdup/config.py index 65bb92679..5d7eb157e 100644 --- a/src/dipdup/config.py +++ b/src/dipdup/config.py @@ -93,9 +93,12 @@ def valid_immune_tables(cls, v): class HTTPConfig: cache: Optional[bool] = None retry_count: Optional[int] = None - retry_sleep: Optional[int] = None + retry_sleep: Optional[float] = None + retry_multiplier: Optional[float] = None ratelimit_rate: Optional[int] = None ratelimit_period: Optional[int] = None + connection_limit: Optional[int] = None + batch_size: Optional[int] = None def merge(self, other: Optional['HTTPConfig']) -> None: if not other: diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index 92d7d7802..4322723f2 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -1,12 +1,11 @@ import asyncio import hashlib import logging -from contextlib import suppress +from contextlib import AsyncExitStack from os.path import join from posix import listdir from typing import Dict, List, Optional, cast -from apscheduler.schedulers import SchedulerNotRunningError # type: ignore from genericpath import exists from tortoise import Tortoise from tortoise.exceptions import OperationalError @@ -176,16 +175,18 @@ async def init(self) -> None: await self._create_datasources() codegen = DipDupCodeGenerator(self._config, self._datasources_by_config) - await codegen.create_package() - await codegen.fetch_schemas() - await codegen.generate_types() - await codegen.generate_default_handlers() - await codegen.generate_user_handlers() - await codegen.generate_jobs() - await codegen.cleanup() - for datasource in self._datasources.values(): - await datasource.close_session() + async with AsyncExitStack() as stack: + for datasource in self._datasources.values(): + await stack.enter_async_context(datasource) + + await codegen.create_package() + await codegen.fetch_schemas() + await codegen.generate_types() + await codegen.generate_default_handlers() + await codegen.generate_user_handlers() + await codegen.generate_jobs() + await codegen.cleanup() async def run(self, reindex: bool, oneshot: bool) -> None: """Main entrypoint""" @@ -193,28 +194,35 @@ async def run(self, reindex: bool, oneshot: bool) -> None: url = self._config.database.connection_string models = f'{self._config.package}.models' - async with tortoise_wrapper(url, models): + await self._create_datasources() + + hasura_gateway: Optional[HasuraGateway] + if self._config.hasura: + if not isinstance(self._config.database, PostgresDatabaseConfig): + raise RuntimeError + hasura_gateway = HasuraGateway(self._config.package, self._config.hasura, self._config.database) + else: + hasura_gateway = None + + async with AsyncExitStack() as stack: + worker_tasks = [] + await stack.enter_async_context(tortoise_wrapper(url, models)) + for datasource in self._datasources.values(): + await stack.enter_async_context(datasource) + if hasura_gateway: + await stack.enter_async_context(hasura_gateway) + worker_tasks.append(asyncio.create_task(hasura_gateway.configure())) + await self._initialize_database(reindex) - await self._create_datasources() await self._configure() self._logger.info('Starting datasources') datasource_tasks = [] if oneshot else [asyncio.create_task(d.run()) for d in self._datasources.values()] - worker_tasks = [] - - hasura_gateway: Optional[HasuraGateway] - if self._config.hasura: - if not isinstance(self._config.database, PostgresDatabaseConfig): - raise RuntimeError - hasura_gateway = HasuraGateway(self._config.package, self._config.hasura, self._config.database) - worker_tasks.append(asyncio.create_task(hasura_gateway.configure())) - else: - hasura_gateway = None if self._config.jobs and not oneshot: + stack.enter_context(self._scheduler) for job_name, job_config in self._config.jobs.items(): add_job(self._ctx, self._scheduler, job_name, job_config) - self._scheduler.start() worker_tasks.append(asyncio.create_task(self._index_dispatcher.run(oneshot))) @@ -222,14 +230,8 @@ async def run(self, reindex: bool, oneshot: bool) -> None: await asyncio.gather(*datasource_tasks, *worker_tasks) except KeyboardInterrupt: pass - finally: - self._logger.info('Closing datasource sessions') - await asyncio.gather(*[d.close_session() for d in self._datasources.values()]) - if hasura_gateway: - await hasura_gateway.close_session() - # FIXME: AttributeError: 'NoneType' object has no attribute 'call_soon_threadsafe' - with suppress(AttributeError, SchedulerNotRunningError): - self._scheduler.shutdown(wait=True) + except GeneratorExit: + pass async def migrate_to_v10(self) -> None: codegen = DipDupCodeGenerator(self._config, self._datasources_by_config) diff --git a/src/dipdup/http.py b/src/dipdup/http.py index 5871246f5..0bc52cd5e 100644 --- a/src/dipdup/http.py +++ b/src/dipdup/http.py @@ -22,6 +22,12 @@ def __init__(self, url: str, http_config: Optional[HTTPConfig] = None) -> None: self._http_config.merge(http_config) self._http = _HTTPGateway(url.rstrip('/'), self._http_config) + async def __aenter__(self) -> None: + await self._http.__aenter__() + + async def __aexit__(self, exc_type, exc, tb) -> None: + await self._http.__aexit__(exc_type, exc, tb) + @abstractmethod def _default_http_config(self) -> HTTPConfig: ... @@ -50,7 +56,15 @@ def __init__(self, url: str, config: HTTPConfig) -> None: if config.ratelimit_rate and config.ratelimit_period else None ) - self._session = aiohttp.ClientSession() + self.__session: Optional[aiohttp.ClientSession] = None + + async def __aenter__(self) -> None: + self.__session = aiohttp.ClientSession( + connector=aiohttp.TCPConnector(limit=self._config.connection_limit or 100), + ) + + async def __aexit__(self, exc_type, exc, tb): + await self.__session.close() @property def user_agent(self) -> str: @@ -61,6 +75,12 @@ def user_agent(self) -> str: self._user_agent = user_agent return self._user_agent + @property + def _session(self) -> aiohttp.ClientSession: + if not self.__session: + raise RuntimeError('Session is not initialized. Wrap with `async with`') + return self.__session + async def _wrapped_request(self, method: str, url: str, **kwargs): attempts = list(range(self._config.retry_count)) if self._config.retry_count else [0] for attempt in attempts: diff --git a/src/dipdup/utils.py b/src/dipdup/utils.py index 58370440c..524c9a29d 100644 --- a/src/dipdup/utils.py +++ b/src/dipdup/utils.py @@ -6,8 +6,7 @@ import time from contextlib import asynccontextmanager from logging import Logger -from types import ModuleType -from typing import Any, AsyncIterator, Iterable, Iterator, List, Optional, Tuple, Type +from typing import Any, AsyncIterator, Iterator, List, Optional, Tuple, Type from tortoise import Tortoise from tortoise.backends.asyncpg.client import AsyncpgDBClient @@ -130,9 +129,7 @@ def set_decimal_context(package: str) -> None: if isinstance(field, DecimalField): context.prec = max(context.prec, field.max_digits + field.max_digits) if prec < context.prec: - _logger.warning( - 'Decimal context precision has been updated: %s -> %s', prec, context.prec - ) + _logger.warning('Decimal context precision has been updated: %s -> %s', prec, context.prec) # NOTE: DefaultContext used for new threads decimal.DefaultContext.prec = context.prec decimal.setcontext(context) diff --git a/tests/integration_tests/test_hasura.py b/tests/integration_tests/test_hasura.py index 0f88c890f..e4831f63e 100644 --- a/tests/integration_tests/test_hasura.py +++ b/tests/integration_tests/test_hasura.py @@ -38,7 +38,6 @@ async def test_configure_hasura(self): hasura_gateway = HasuraGateway('demo_hic_et_nunc', hasura_config, database_config) hasura_gateway._get_views = AsyncMock(return_value=[]) - await hasura_gateway.close_session() hasura_gateway._http = Mock() hasura_gateway._http.request = AsyncMock( side_effect=[ From 8c8b182b8dcb0734d465502c627b4dd2024e06fb Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Wed, 14 Jul 2021 16:45:43 +0300 Subject: [PATCH 09/10] Connection limit --- src/dipdup/config.py | 11 ++++++----- src/dipdup/datasources/bcd/datasource.py | 1 + src/dipdup/datasources/tzkt/datasource.py | 4 ++-- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/dipdup/config.py b/src/dipdup/config.py index 5d7eb157e..2724e5a87 100644 --- a/src/dipdup/config.py +++ b/src/dipdup/config.py @@ -165,12 +165,13 @@ class TzktDatasourceConfig(NameMixin): def __hash__(self): return hash(self.url) - @validator('url', allow_reuse=True) - def valid_url(cls, v): - parsed_url = urlparse(v) + def __post_init_post_parse__(self) -> None: + super().__post_init_post_parse__() + if self.http and self.http.batch_size and self.http.batch_size > 10000: + raise ConfigurationError('`batch_size` must be less than 10000') + parsed_url = urlparse(self.url) if not (parsed_url.scheme and parsed_url.netloc): - raise ConfigurationError(f'`{v}` is not a valid datasource URL') - return v + raise ConfigurationError(f'`{self.url}` is not a valid datasource URL') @dataclass diff --git a/src/dipdup/datasources/bcd/datasource.py b/src/dipdup/datasources/bcd/datasource.py index 88c4e72b9..b1adc74b1 100644 --- a/src/dipdup/datasources/bcd/datasource.py +++ b/src/dipdup/datasources/bcd/datasource.py @@ -53,4 +53,5 @@ def _default_http_config(self) -> HTTPConfig: retry_sleep=1, ratelimit_rate=100, ratelimit_period=30, + connection_limit=25, ) diff --git a/src/dipdup/datasources/tzkt/datasource.py b/src/dipdup/datasources/tzkt/datasource.py index 7bcc309db..6eac16b70 100644 --- a/src/dipdup/datasources/tzkt/datasource.py +++ b/src/dipdup/datasources/tzkt/datasource.py @@ -25,7 +25,6 @@ OperationID = int -TZKT_HTTP_REQUEST_LIMIT = 10000 TZKT_ORIGINATIONS_REQUEST_LIMIT = 100 OPERATION_FIELDS = ( "type", @@ -291,7 +290,7 @@ def __init__( @property def request_limit(self) -> int: - return TZKT_HTTP_REQUEST_LIMIT + return self._http_config.batch_size or 10000 @property def level(self) -> Optional[int]: @@ -587,6 +586,7 @@ def _default_http_config(self) -> HTTPConfig: retry_sleep=1, ratelimit_rate=100, ratelimit_period=30, + connection_limit=25, ) async def _on_operation_message(self, message: List[Dict[str, Any]]) -> None: From c9560b5770f2e8a68c9e5872881f1ae95abd1a8c Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Wed, 14 Jul 2021 17:24:22 +0300 Subject: [PATCH 10/10] retry_multiplier --- src/dipdup/datasources/bcd/datasource.py | 2 +- src/dipdup/datasources/tzkt/datasource.py | 2 +- src/dipdup/hasura.py | 2 +- src/dipdup/http.py | 15 +++++++++------ 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/dipdup/datasources/bcd/datasource.py b/src/dipdup/datasources/bcd/datasource.py index b1adc74b1..5b28273f3 100644 --- a/src/dipdup/datasources/bcd/datasource.py +++ b/src/dipdup/datasources/bcd/datasource.py @@ -49,8 +49,8 @@ async def get_token(self, address: str, token_id: int) -> Optional[Dict[str, Any def _default_http_config(self) -> HTTPConfig: return HTTPConfig( cache=True, - retry_count=3, retry_sleep=1, + retry_multiplier=1.1, ratelimit_rate=100, ratelimit_period=30, connection_limit=25, diff --git a/src/dipdup/datasources/tzkt/datasource.py b/src/dipdup/datasources/tzkt/datasource.py index 6eac16b70..97dc4eb07 100644 --- a/src/dipdup/datasources/tzkt/datasource.py +++ b/src/dipdup/datasources/tzkt/datasource.py @@ -582,8 +582,8 @@ async def subscribe_to_head(self) -> None: def _default_http_config(self) -> HTTPConfig: return HTTPConfig( cache=True, - retry_count=3, retry_sleep=1, + retry_multiplier=1.1, ratelimit_rate=100, ratelimit_period=30, connection_limit=25, diff --git a/src/dipdup/hasura.py b/src/dipdup/hasura.py index bc70ee851..6c9f31f9c 100644 --- a/src/dipdup/hasura.py +++ b/src/dipdup/hasura.py @@ -111,8 +111,8 @@ async def configure(self, reset: bool = False) -> None: def _default_http_config(self) -> HTTPConfig: return HTTPConfig( cache=False, - retry_count=3, retry_sleep=1, + retry_multiplier=1.1, ratelimit_rate=100, ratelimit_period=1, ) diff --git a/src/dipdup/http.py b/src/dipdup/http.py index 0bc52cd5e..2032bd767 100644 --- a/src/dipdup/http.py +++ b/src/dipdup/http.py @@ -82,9 +82,10 @@ def _session(self) -> aiohttp.ClientSession: return self.__session async def _wrapped_request(self, method: str, url: str, **kwargs): - attempts = list(range(self._config.retry_count)) if self._config.retry_count else [0] - for attempt in attempts: - self._logger.debug('HTTP request attempt %s/%s', attempt + 1, self._config.retry_count) + attempt = 1 + retry_sleep = self._config.retry_sleep or 0 + while True: + self._logger.debug('HTTP request attempt %s/%s', attempt + 1, self._config.retry_count or 'inf') try: return await self._request( method=method, @@ -92,10 +93,11 @@ async def _wrapped_request(self, method: str, url: str, **kwargs): **kwargs, ) except (aiohttp.ClientConnectionError, aiohttp.ClientConnectorError) as e: - if attempt + 1 == attempts[-1]: + if self._config.retry_count and attempt - 1 == self._config.retry_count: raise e self._logger.warning('HTTP request failed: %s', e) - await asyncio.sleep(self._config.retry_sleep or 0) + await asyncio.sleep(retry_sleep) + retry_sleep *= self._config.retry_multiplier or 1 except aiohttp.ClientResponseError as e: if e.code == HTTPStatus.TOO_MANY_REQUESTS: ratelimit_sleep = 5 @@ -107,10 +109,11 @@ async def _wrapped_request(self, method: str, url: str, **kwargs): self._logger.warning('HTTP request failed: %s', e) await asyncio.sleep(ratelimit_sleep) else: - if attempt + 1 == attempts[-1]: + if self._config.retry_count and attempt - 1 == self._config.retry_count: raise e self._logger.warning('HTTP request failed: %s', e) await asyncio.sleep(self._config.retry_sleep or 0) + retry_sleep *= self._config.retry_multiplier or 1 async def _request(self, method: str, url: str, **kwargs): """Wrapped aiohttp call with preconfigured headers and logging"""