diff --git a/src/dipdup/__main__.py b/src/dipdup/__main__.py index f044dc92a..58e1304d3 100644 --- a/src/dipdup/__main__.py +++ b/src/dipdup/__main__.py @@ -1,14 +1,4 @@ -import logging - from dipdup.cli import cli -from dipdup.exceptions import DipDupError if __name__ == '__main__': - try: - cli(prog_name='dipdup', standalone_mode=False) # type: ignore - except KeyboardInterrupt: - pass - except DipDupError as e: - logging.critical(e.__repr__()) - logging.info(e.format()) - quit(e.exit_code) + cli(prog_name='dipdup', standalone_mode=False) # type: ignore diff --git a/src/dipdup/cli.py b/src/dipdup/cli.py index 27896f832..f6cd73cab 100644 --- a/src/dipdup/cli.py +++ b/src/dipdup/cli.py @@ -2,6 +2,7 @@ import logging import os from dataclasses import dataclass +from functools import wraps from os.path import dirname, exists, join from typing import List, cast @@ -16,7 +17,7 @@ from dipdup.codegen import DEFAULT_DOCKER_ENV_FILE, DEFAULT_DOCKER_IMAGE, DEFAULT_DOCKER_TAG, DipDupCodeGenerator from dipdup.config import DipDupConfig, LoggingConfig, PostgresDatabaseConfig from dipdup.dipdup import DipDup -from dipdup.exceptions import ConfigurationError, MigrationRequiredError +from dipdup.exceptions import ConfigurationError, DipDupError, MigrationRequiredError from dipdup.hasura import HasuraGateway from dipdup.utils import set_decimal_context, tortoise_wrapper @@ -30,6 +31,21 @@ class CLIContext: logging_config: LoggingConfig +def cli_wrapper(fn): + @wraps(fn) + async def wrapper(*args, **kwargs): + try: + return await fn(*args, **kwargs) + except KeyboardInterrupt: + pass + except DipDupError as e: + _logger.critical(e.__repr__()) + _logger.info(e.format()) + quit(e.exit_code) + + return wrapper + + def init_sentry(config: DipDupConfig) -> None: if not config.sentry: return @@ -59,6 +75,7 @@ def init_sentry(config: DipDupConfig) -> None: @click.option('--env-file', '-e', type=str, multiple=True, help='Path to .env file', default=[]) @click.option('--logging-config', '-l', type=str, help='Path to logging YAML config', default='logging.yml') @click.pass_context +@cli_wrapper async def cli(ctx, config: List[str], env_file: List[str], logging_config: str): try: path = join(os.getcwd(), logging_config) @@ -96,6 +113,7 @@ async def cli(ctx, config: List[str], env_file: List[str], logging_config: str): @click.option('--reindex', is_flag=True, help='Drop database and start indexing from scratch') @click.option('--oneshot', is_flag=True, help='Synchronize indexes wia REST and exit without starting WS connection') @click.pass_context +@cli_wrapper async def run(ctx, reindex: bool, oneshot: bool) -> None: config: DipDupConfig = ctx.obj.config config.initialize() @@ -106,6 +124,7 @@ async def run(ctx, reindex: bool, oneshot: bool) -> None: @cli.command(help='Initialize new dipdup project') @click.pass_context +@cli_wrapper async def init(ctx): config: DipDupConfig = ctx.obj.config config.pre_initialize() @@ -115,6 +134,7 @@ async def init(ctx): @cli.command(help='Migrate project to the new spec version') @click.pass_context +@cli_wrapper async def migrate(ctx): def _bump_spec_version(spec_version: str): for config_path in ctx.obj.config_paths: @@ -141,12 +161,14 @@ def _bump_spec_version(spec_version: str): @cli.command(help='Clear development request cache') @click.pass_context +@cli_wrapper async def clear_cache(ctx): FileCache('dipdup', flag='cs').clear() @cli.group() @click.pass_context +@cli_wrapper async def docker(ctx): ... @@ -156,6 +178,7 @@ async def docker(ctx): @click.option('--tag', '-t', type=str, help='DipDup Docker tag', default=DEFAULT_DOCKER_TAG) @click.option('--env-file', '-e', type=str, help='Path to env_file', default=DEFAULT_DOCKER_ENV_FILE) @click.pass_context +@cli_wrapper async def docker_init(ctx, image: str, tag: str, env_file: str): config: DipDupConfig = ctx.obj.config await DipDupCodeGenerator(config, {}).generate_docker(image, tag, env_file) @@ -163,6 +186,7 @@ async def docker_init(ctx, image: str, tag: str, env_file: str): @cli.group() @click.pass_context +@cli_wrapper async def hasura(ctx): ... @@ -170,6 +194,7 @@ async def hasura(ctx): @hasura.command(name='configure', help='Configure Hasura GraphQL Engine') @click.option('--reset', is_flag=True, help='Reset metadata before configuring') @click.pass_context +@cli_wrapper async def hasura_configure(ctx, reset: bool): config: DipDupConfig = ctx.obj.config url = config.database.connection_string diff --git a/src/dipdup/codegen.py b/src/dipdup/codegen.py index 51c9e2452..e6910ddac 100644 --- a/src/dipdup/codegen.py +++ b/src/dipdup/codegen.py @@ -24,7 +24,7 @@ OperationIndexConfig, TzktDatasourceConfig, ) -from dipdup.datasources import DatasourceT +from dipdup.datasources.datasource import Datasource from dipdup.datasources.tzkt.datasource import TzktDatasource from dipdup.exceptions import ConfigurationError from dipdup.utils import import_submodules, mkdir_p, pascal_to_snake, snake_to_pascal, touch, write @@ -68,7 +68,7 @@ def load_template(name: str) -> Template: class DipDupCodeGenerator: """Generates package based on config, invoked from `init` CLI command""" - def __init__(self, config: DipDupConfig, datasources: Dict[DatasourceConfigT, DatasourceT]) -> None: + def __init__(self, config: DipDupConfig, datasources: Dict[DatasourceConfigT, Datasource]) -> None: self._logger = logging.getLogger('dipdup.codegen') self._config = config self._datasources = datasources diff --git a/src/dipdup/config.py b/src/dipdup/config.py index 8604658b3..ecdf4e4b7 100644 --- a/src/dipdup/config.py +++ b/src/dipdup/config.py @@ -10,7 +10,7 @@ from enum import Enum from os import environ as env from os.path import dirname -from typing import Any, Callable, Dict, List, Optional, Sequence, Type, Union, cast +from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Type, Union, cast from urllib.parse import urlparse from pydantic import Field, validator @@ -285,6 +285,24 @@ def initialize_storage_cls(self, package: str, module_name: str) -> None: self.storage_type_cls = storage_type_cls +@dataclass +class ParentMixin: + """`parent` field for index and template configs""" + + def __post_init_post_parse__(self): + self._parent: Optional['IndexConfig'] = None + + @property + def parent(self) -> Optional['IndexConfig']: + return self._parent + + @parent.setter + def parent(self, config: 'IndexConfig') -> None: + if self._parent: + raise RuntimeError('Can\'t unset parent once set') + self._parent = config + + @dataclass class ParameterTypeMixin: """`parameter_type_cls` field""" @@ -459,10 +477,11 @@ def originated_contract_config(self) -> ContractConfig: @dataclass -class HandlerConfig: +class HandlerConfig(NameMixin): callback: str def __post_init_post_parse__(self): + super().__post_init_post_parse__() self._callback_fn = None if self.callback in (ROLLBACK_HANDLER, CONFIGURE_HANDLER): raise ConfigurationError(f'`{self.callback}` callback name is reserved') @@ -509,12 +528,13 @@ def template_values(self, value: Dict[str, str]) -> None: @dataclass -class IndexConfig(TemplateValuesMixin, NameMixin): +class IndexConfig(TemplateValuesMixin, NameMixin, ParentMixin): datasource: Union[str, TzktDatasourceConfig] def __post_init_post_parse__(self) -> None: TemplateValuesMixin.__post_init_post_parse__(self) NameMixin.__post_init_post_parse__(self) + ParentMixin.__post_init_post_parse__(self) def hash(self) -> str: config_json = json.dumps(self, default=pydantic_encoder) @@ -532,10 +552,11 @@ def datasource_config(self) -> TzktDatasourceConfig: class OperationIndexConfig(IndexConfig): """Operation index config - :param datasource: Alias of datasource in `datasources` block - :param contract: Alias of contract to fetch operations for - :param first_block: First block to process - :param last_block: Last block to process + :param datasource: Alias of index datasource in `datasources` section + :param contracts: Aliases of contracts being indexed in `contracts` section + :param stateless: Makes index dynamic. DipDup will synchronize index from the first block on every run + :param first_block: First block to process (use with `--oneshot` run argument) + :param last_block: Last block to process (use with `--oneshot` run argument) :param handlers: List of indexer handlers """ @@ -557,6 +578,15 @@ def contract_configs(self) -> List[ContractConfig]: raise RuntimeError('Config is not initialized') return cast(List[ContractConfig], self.contracts) + @property + def entrypoints(self) -> Set[str]: + entrypoints = set() + for handler in self.handlers: + for pattern in handler.pattern: + if isinstance(pattern, OperationHandlerTransactionPatternConfig) and pattern.entrypoint: + entrypoints.add(pattern.entrypoint) + return entrypoints + @dataclass class BigMapHandlerConfig(HandlerConfig): @@ -611,7 +641,7 @@ def contracts(self) -> List[ContractConfig]: @dataclass -class IndexTemplateConfig: +class IndexTemplateConfig(ParentMixin): kind = 'template' template: str values: Dict[str, str] @@ -692,12 +722,12 @@ class DipDupConfig: spec_version: str package: str datasources: Dict[str, DatasourceConfigT] + database: Union[SqliteDatabaseConfig, PostgresDatabaseConfig] = SqliteDatabaseConfig(kind='sqlite') contracts: Dict[str, ContractConfig] = Field(default_factory=dict) indexes: Dict[str, IndexConfigT] = Field(default_factory=dict) templates: Dict[str, IndexConfigTemplateT] = Field(default_factory=dict) - database: Union[SqliteDatabaseConfig, PostgresDatabaseConfig] = SqliteDatabaseConfig(kind='sqlite') + jobs: Dict[str, JobConfig] = Field(default_factory=dict) hasura: Optional[HasuraConfig] = None - jobs: Optional[Dict[str, JobConfig]] = None sentry: Optional[SentryConfig] = None def __post_init_post_parse__(self): @@ -721,20 +751,27 @@ def validate(self) -> None: raise ConfigurationError('SQLite DB engine is not supported by Hasura') def get_contract(self, name: str) -> ContractConfig: + if name.startswith('<') and name.endswith('>'): + raise ConfigurationError(f'`{name}` variable of index template is not set') + try: return self.contracts[name] except KeyError as e: raise ConfigurationError(f'Contract `{name}` not found in `contracts` config section') from e def get_datasource(self, name: str) -> DatasourceConfigT: + if name.startswith('<') and name.endswith('>'): + raise ConfigurationError(f'`{name}` variable of index template is not set') + try: return self.datasources[name] except KeyError as e: raise ConfigurationError(f'Datasource `{name}` not found in `datasources` config section') from e def get_template(self, name: str) -> IndexConfigTemplateT: - if not self.templates: - raise ConfigurationError('`templates` section is missing') + if name.startswith('<') and name.endswith('>'): + raise ConfigurationError(f'`{name}` variable of index template is not set') + try: return self.templates[name] except KeyError as e: @@ -772,6 +809,7 @@ def resolve_index_templates(self) -> None: json_template = json.loads(raw_template) new_index_config = template.__class__(**json_template) new_index_config.template_values = index_config.values + new_index_config.parent = index_config.parent self.indexes[index_name] = new_index_config def _pre_initialize_index(self, index_name: str, index_config: IndexConfigT) -> None: @@ -831,6 +869,8 @@ def pre_initialize(self) -> None: contract_config.name = name for name, datasource_config in self.datasources.items(): datasource_config.name = name + for name, job_config in self.jobs.items(): + job_config.name = name self.resolve_index_templates() for index_name, index_config in self.indexes.items(): diff --git a/src/dipdup/context.py b/src/dipdup/context.py index 79d93ee6b..70cc82f6d 100644 --- a/src/dipdup/context.py +++ b/src/dipdup/context.py @@ -6,17 +6,17 @@ from tortoise import Tortoise from tortoise.transactions import in_transaction -from dipdup.config import ContractConfig, DipDupConfig, IndexTemplateConfig, PostgresDatabaseConfig -from dipdup.datasources import DatasourceT +from dipdup.config import ContractConfig, DipDupConfig, IndexConfig, IndexTemplateConfig, PostgresDatabaseConfig +from dipdup.datasources.datasource import Datasource from dipdup.exceptions import ContractAlreadyExistsError, IndexAlreadyExistsError from dipdup.utils import FormattedLogger -# TODO: Dataclasses are cool, everyone loves them. Resolve issue with pydantic in HandlerContext. +# TODO: Dataclasses are cool, everyone loves them. Resolve issue with pydantic serialization. class DipDupContext: def __init__( self, - datasources: Dict[str, DatasourceT], + datasources: Dict[str, Datasource], config: DipDupConfig, ) -> None: self.datasources = datasources @@ -74,16 +74,18 @@ class HandlerContext(DipDupContext): def __init__( self, - datasources: Dict[str, DatasourceT], + datasources: Dict[str, Datasource], config: DipDupConfig, logger: FormattedLogger, template_values: Optional[Dict[str, str]], - datasource: DatasourceT, + datasource: Datasource, + index_config: IndexConfig, ) -> None: super().__init__(datasources, config) self.logger = logger self.template_values = template_values self.datasource = datasource + self.index_config = index_config def add_contract(self, name: str, address: str, typename: Optional[str] = None) -> None: if name in self.config.contracts: @@ -102,21 +104,38 @@ def add_index(self, name: str, template: str, values: Dict[str, Any]) -> None: template=template, values=values, ) + # NOTE: Notify datasource to subscribe to operations by entrypoint if enabled in index config + self.config.indexes[name].parent = self.index_config self._updated = True -class RollbackHandlerContext(HandlerContext): - template_values: None +class JobContext(DipDupContext): + """Job handler context.""" def __init__( self, - datasources: Dict[str, DatasourceT], + datasources: Dict[str, Datasource], config: DipDupConfig, logger: FormattedLogger, - datasource: DatasourceT, + ) -> None: + super().__init__(datasources, config) + self.logger = logger + + # TODO: Spawning indexes from jobs? + + +class RollbackHandlerContext(DipDupContext): + def __init__( + self, + datasources: Dict[str, Datasource], + config: DipDupConfig, + logger: FormattedLogger, + datasource: Datasource, from_level: int, to_level: int, ) -> None: - super().__init__(datasources, config, logger, None, datasource) + super().__init__(datasources, config) + self.logger = logger + self.datasource = datasource self.from_level = from_level self.to_level = to_level diff --git a/src/dipdup/datasources/__init__.py b/src/dipdup/datasources/__init__.py index 79a04a51c..e69de29bb 100644 --- a/src/dipdup/datasources/__init__.py +++ b/src/dipdup/datasources/__init__.py @@ -1,7 +0,0 @@ -from typing import Union - -from dipdup.datasources.bcd.datasource import BcdDatasource -from dipdup.datasources.coinbase.datasource import CoinbaseDatasource -from dipdup.datasources.tzkt.datasource import TzktDatasource - -DatasourceT = Union[TzktDatasource, BcdDatasource, CoinbaseDatasource] diff --git a/src/dipdup/datasources/bcd/datasource.py b/src/dipdup/datasources/bcd/datasource.py index 5b28273f3..a9b9c18a9 100644 --- a/src/dipdup/datasources/bcd/datasource.py +++ b/src/dipdup/datasources/bcd/datasource.py @@ -2,12 +2,12 @@ from typing import Any, Dict, List, Optional from dipdup.config import HTTPConfig -from dipdup.http import HTTPGateway +from dipdup.datasources.datasource import Datasource TOKENS_REQUEST_LIMIT = 10 -class BcdDatasource(HTTPGateway): +class BcdDatasource(Datasource): def __init__( self, url: str, diff --git a/src/dipdup/datasources/coinbase/datasource.py b/src/dipdup/datasources/coinbase/datasource.py index 0526985d7..4a701a626 100644 --- a/src/dipdup/datasources/coinbase/datasource.py +++ b/src/dipdup/datasources/coinbase/datasource.py @@ -4,13 +4,13 @@ from dipdup.config import HTTPConfig from dipdup.datasources.coinbase.models import CandleData, CandleInterval -from dipdup.http import HTTPGateway +from dipdup.datasources.datasource import Datasource CANDLES_REQUEST_LIMIT = 300 API_URL = 'https://api.pro.coinbase.com' -class CoinbaseDatasource(HTTPGateway): +class CoinbaseDatasource(Datasource): def __init__(self, url: str = API_URL, http_config: Optional[HTTPConfig] = None) -> None: super().__init__(url, http_config) self._logger = logging.getLogger('dipdup.coinbase') diff --git a/src/dipdup/datasources/datasource.py b/src/dipdup/datasources/datasource.py index c08b775b9..2856b2f88 100644 --- a/src/dipdup/datasources/datasource.py +++ b/src/dipdup/datasources/datasource.py @@ -1,6 +1,12 @@ +from abc import abstractmethod +from collections import defaultdict +from copy import copy from enum import Enum -from typing import Awaitable, List, Optional, Protocol +from functools import partial +from typing import Awaitable, DefaultDict, List, Optional, Protocol, Set +from pydantic.dataclasses import dataclass +from pydantic.fields import Field from pyee import AsyncIOEventEmitter # type: ignore from dipdup.config import HTTPConfig @@ -35,7 +41,13 @@ def __call__(self, datasource: 'IndexDatasource', block: HeadBlockData) -> Await ... -class IndexDatasource(HTTPGateway, AsyncIOEventEmitter): +class Datasource(HTTPGateway): + @abstractmethod + async def run(self) -> None: + ... + + +class IndexDatasource(Datasource, AsyncIOEventEmitter): def __init__(self, url: str, http_config: Optional[HTTPConfig] = None) -> None: HTTPGateway.__init__(self, url, http_config) AsyncIOEventEmitter.__init__(self) @@ -71,3 +83,47 @@ def emit_rollback(self, from_level: int, to_level: int) -> None: def emit_head(self, block: HeadBlockData) -> None: super().emit(EventType.head, datasource=self, block=block) + + +@dataclass +class Subscriptions: + address_transactions: Set[str] = Field(default_factory=set) + originations: bool = False + head: bool = False + big_maps: DefaultDict[str, Set[str]] = Field(default_factory=partial(defaultdict, set)) + + def get_pending(self, active_subscriptions: 'Subscriptions') -> 'Subscriptions': + return Subscriptions( + address_transactions=self.address_transactions.difference(active_subscriptions.address_transactions), + originations=not active_subscriptions.originations, + head=not active_subscriptions.head, + big_maps=defaultdict(set, {k: self.big_maps[k] for k in set(self.big_maps) - set(active_subscriptions.big_maps)}), + ) + + +class SubscriptionManager: + def __init__(self) -> None: + self._subscriptions: Subscriptions = Subscriptions() + self._active_subscriptions: Subscriptions = Subscriptions() + + def add_address_transaction_subscription(self, address: str) -> None: + self._subscriptions.address_transactions.add(address) + + def add_origination_subscription(self) -> None: + self._subscriptions.originations = True + + def add_head_subscription(self) -> None: + self._subscriptions.head = True + + def add_big_map_subscription(self, address: str, paths: Set[str]) -> None: + self._subscriptions.big_maps[address] = self._subscriptions.big_maps[address] | paths + + def get_pending(self) -> Subscriptions: + pending_subscriptions = self._subscriptions.get_pending(self._active_subscriptions) + return pending_subscriptions + + def commit(self) -> None: + self._active_subscriptions = copy(self._subscriptions) + + def reset(self) -> None: + self._active_subscriptions = Subscriptions() diff --git a/src/dipdup/datasources/tzkt/datasource.py b/src/dipdup/datasources/tzkt/datasource.py index 7b7a16eef..f1ac09abb 100644 --- a/src/dipdup/datasources/tzkt/datasource.py +++ b/src/dipdup/datasources/tzkt/datasource.py @@ -18,7 +18,7 @@ OperationHandlerOriginationPatternConfig, OperationIndexConfig, ) -from dipdup.datasources.datasource import IndexDatasource +from dipdup.datasources.datasource import IndexDatasource, SubscriptionManager from dipdup.datasources.tzkt.enums import TzktMessageType from dipdup.models import BigMapAction, BigMapData, BlockData, HeadBlockData, OperationData from dipdup.utils import split_by_chunks @@ -275,13 +275,13 @@ def __init__( self, url: str, http_config: Optional[HTTPConfig] = None, + realtime: bool = True, ) -> None: super().__init__(url, http_config) self._logger = logging.getLogger('dipdup.tzkt') - self._transaction_subscriptions: Set[str] = set() - self._origination_subscriptions: bool = False - self._big_map_subscriptions: Dict[str, List[str]] = {} + self._subscriptions: SubscriptionManager = SubscriptionManager() + self._realtime: bool = realtime self._client: Optional[BaseHubConnection] = None self._block: Optional[HeadBlockData] = None @@ -454,32 +454,26 @@ async def get_big_maps( return big_maps async def add_index(self, index_config: IndexConfigTemplateT) -> None: - """Register index config in internal mappings and matchers. Find and register subscriptions. - If called in runtime need to `resync` then.""" + """Register index config in internal mappings and matchers. Find and register subscriptions.""" if isinstance(index_config, OperationIndexConfig): - for contract_config in index_config.contracts or []: - self._transaction_subscriptions.add(cast(ContractConfig, contract_config).address) + self._subscriptions.add_address_transaction_subscription(cast(ContractConfig, contract_config).address) for handler_config in index_config.handlers: for pattern_config in handler_config.pattern: if isinstance(pattern_config, OperationHandlerOriginationPatternConfig): - self._origination_subscriptions = True + self._subscriptions.add_origination_subscription() elif isinstance(index_config, BigMapIndexConfig): - for big_map_handler_config in index_config.handlers: address, path = big_map_handler_config.contract_config.address, big_map_handler_config.path - if address not in self._big_map_subscriptions: - self._big_map_subscriptions[address] = [] - if path not in self._big_map_subscriptions[address]: - self._big_map_subscriptions[address].append(path) + self._subscriptions.add_big_map_subscription(address, set(path)) else: raise NotImplementedError(f'Index kind `{index_config.kind}` is not supported') - await self._on_connect() + await self.subscribe() def _get_client(self) -> BaseHubConnection: """Create SignalR client, register message callbacks""" @@ -508,6 +502,7 @@ def _get_client(self) -> BaseHubConnection: async def run(self) -> None: """Main loop. Sync indexes via REST, start WS connection""" + # TODO: Update docs, honor `realtime` flag self._logger.info('Starting datasource') self._logger.info('Starting websocket client') @@ -515,24 +510,36 @@ async def run(self) -> None: async def _on_connect(self) -> None: """Subscribe to all required channels on established WS connection""" - if self._get_client().transport.state != ConnectionState.connected: + self._logger.info('Connected to server') + self._subscriptions.reset() + await self.subscribe() + + async def subscribe(self) -> None: + """Subscribe to all required channels""" + if not self._realtime: return - self._logger.info('Connected to server') - await self.subscribe_to_head() - for address in self._transaction_subscriptions: - await self.subscribe_to_transactions(address) - # NOTE: All originations are passed to matcher - if self._origination_subscriptions: - await self.subscribe_to_originations() - for address, paths in self._big_map_subscriptions.items(): - await self.subscribe_to_big_maps(address, paths) + pending_subscriptions = self._subscriptions.get_pending() + + for address in pending_subscriptions.address_transactions: + await self._subscribe_to_address_transactions(address) + if pending_subscriptions.originations: + await self._subscribe_to_originations() + if pending_subscriptions.head: + await self._subscribe_to_head() + for address, paths in pending_subscriptions.big_maps.items(): + await self._subscribe_to_big_maps(address, paths) + self._subscriptions.commit() + + # NOTE: Pay attention: this is not a pyee callback def _on_error(self, message: CompletionMessage) -> NoReturn: """Raise exception from WS server's error message""" raise Exception(message.error) - async def subscribe_to_transactions(self, address: str) -> None: + # TODO: Catch exceptions from pyee 'error' channel + + async def _subscribe_to_address_transactions(self, address: str) -> None: """Subscribe to contract's operations on established WS connection""" self._logger.info('Subscribing to %s transactions', address) await self._send( @@ -545,7 +552,7 @@ async def subscribe_to_transactions(self, address: str) -> None: ], ) - async def subscribe_to_originations(self) -> None: + async def _subscribe_to_originations(self) -> None: """Subscribe to all originations on established WS connection""" self._logger.info('Subscribing to originations') await self._send( @@ -557,7 +564,7 @@ async def subscribe_to_originations(self) -> None: ], ) - async def subscribe_to_big_maps(self, address: str, paths: List[str]) -> None: + async def _subscribe_to_big_maps(self, address: str, paths: Set[str]) -> None: """Subscribe to contract's big map diffs on established WS connection""" self._logger.info('Subscribing to big map updates of %s, %s', address, paths) for path in paths: @@ -571,7 +578,7 @@ async def subscribe_to_big_maps(self, address: str, paths: List[str]) -> None: ], ) - async def subscribe_to_head(self) -> None: + async def _subscribe_to_head(self) -> None: """Subscribe to head on established WS connection""" self._logger.info('Subscribing to head') await self._send( diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index 973c681b7..73af61933 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -31,10 +31,9 @@ TzktDatasourceConfig, ) from dipdup.context import DipDupContext, RollbackHandlerContext -from dipdup.datasources import DatasourceT from dipdup.datasources.bcd.datasource import BcdDatasource from dipdup.datasources.coinbase.datasource import CoinbaseDatasource -from dipdup.datasources.datasource import IndexDatasource +from dipdup.datasources.datasource import Datasource, IndexDatasource from dipdup.datasources.tzkt.datasource import TzktDatasource from dipdup.exceptions import ConfigurationError, ReindexingRequiredError from dipdup.hasura import HasuraGateway @@ -177,8 +176,8 @@ class DipDup: def __init__(self, config: DipDupConfig) -> None: self._logger = logging.getLogger('dipdup') self._config = config - self._datasources: Dict[str, DatasourceT] = {} - self._datasources_by_config: Dict[DatasourceConfigT, DatasourceT] = {} + self._datasources: Dict[str, Datasource] = {} + self._datasources_by_config: Dict[DatasourceConfigT, Datasource] = {} self._ctx = DipDupContext( config=self._config, datasources=self._datasources, @@ -189,7 +188,7 @@ def __init__(self, config: DipDupConfig) -> None: async def init(self) -> None: """Create new or update existing dipdup project""" - await self._create_datasources() + await self._create_datasources(realtime=False) async with AsyncExitStack() as stack: for datasource in self._datasources.values(): @@ -206,7 +205,7 @@ async def run(self, reindex: bool, oneshot: bool) -> None: url = self._config.database.connection_string models = f'{self._config.package}.models' - await self._create_datasources() + await self._create_datasources(realtime=not oneshot) hasura_gateway: Optional[HasuraGateway] if self._config.hasura: @@ -262,8 +261,8 @@ async def _configure(self) -> None: await configure_fn(self._ctx) self._config.initialize() - async def _create_datasources(self) -> None: - datasource: DatasourceT + async def _create_datasources(self, realtime: bool = True) -> None: + datasource: Datasource for name, datasource_config in self._config.datasources.items(): if name in self._datasources: continue @@ -272,6 +271,7 @@ async def _create_datasources(self) -> None: datasource = TzktDatasource( url=datasource_config.url, http_config=datasource_config.http, + realtime=realtime, ) elif isinstance(datasource_config, BcdDatasourceConfig): datasource = BcdDatasource( diff --git a/src/dipdup/http.py b/src/dipdup/http.py index 165b75c69..6bef51560 100644 --- a/src/dipdup/http.py +++ b/src/dipdup/http.py @@ -86,7 +86,7 @@ async def _wrapped_request(self, method: str, url: str, **kwargs): attempt = 1 retry_sleep = self._config.retry_sleep or 0 while True: - self._logger.debug('HTTP request attempt %s/%s', attempt + 1, self._config.retry_count or 'inf') + self._logger.debug('HTTP request attempt %s/%s', attempt, self._config.retry_count or 'inf') try: return await self._request( method=method, diff --git a/src/dipdup/index.py b/src/dipdup/index.py index e6be7185c..376a0eede 100644 --- a/src/dipdup/index.py +++ b/src/dipdup/index.py @@ -1,4 +1,3 @@ -import logging from abc import abstractmethod from collections import defaultdict, deque, namedtuple from contextlib import suppress @@ -36,7 +35,7 @@ def __init__(self, ctx: DipDupContext, config: IndexConfigTemplateT, datasource: self._config = config self._datasource = datasource - self._logger = logging.getLogger('dipdup.index') + self._logger = FormattedLogger('dipdup.index', fmt=f'{config.name}: ' + '{}') self._state: Optional[State] = None @property @@ -75,7 +74,7 @@ async def _process_queue(self) -> None: ... async def _initialize_index_state(self) -> None: - self._logger.info('Getting state for index `%s`', self._config.name) + self._logger.info('Getting index state') index_config_hash = self._config.hash() state = await State.get_or_none( index_name=self._config.name, @@ -94,6 +93,7 @@ async def _initialize_index_state(self) -> None: self._logger.warning('Config hash mismatch (config has been changed), reindexing') await self._ctx.reindex() + self._logger.info('%s', f'{state.level=} {state.hash=}'.replace('state.', '')) # NOTE: No need to check genesis block if state.level: block = await self._datasource.get_block(state.level) @@ -162,6 +162,7 @@ async def _synchronize(self, last_level: int, cache: bool = False) -> None: state.level = last_level # type: ignore await state.save() + self._logger.info('Index is synchronized to level %s', last_level) async def _process_level_operations(self, level: int, operations: List[OperationData], block: Optional[HeadBlockData] = None) -> None: state = await self.get_state() @@ -343,6 +344,7 @@ async def _on_match( logger=logger, template_values=self._config.template_values, datasource=self.datasource, + index_config=self._config, ) await handler_config.callback_fn(handler_context, *args) @@ -429,6 +431,7 @@ async def _synchronize(self, last_level: int, cache: bool = False) -> None: state.level = last_level # type: ignore await state.save() + self._logger.info('Index is synchronized to level %s', last_level) async def _process_level_big_maps(self, level: int, big_maps: List[BigMapData]): state = await self.get_state() @@ -492,6 +495,7 @@ async def _on_match( logger=logger, template_values=self._config.template_values, datasource=self.datasource, + index_config=self._config, ) await handler_config.callback_fn(handler_context, big_map_context) diff --git a/src/dipdup/scheduler.py b/src/dipdup/scheduler.py index d80f8fec7..4b0a8daeb 100644 --- a/src/dipdup/scheduler.py +++ b/src/dipdup/scheduler.py @@ -8,8 +8,8 @@ from pytz import utc from dipdup.config import JobConfig -from dipdup.context import DipDupContext -from dipdup.utils import in_global_transaction +from dipdup.context import DipDupContext, JobContext +from dipdup.utils import FormattedLogger, in_global_transaction jobstores = { 'default': MemoryJobStore(), @@ -17,17 +17,12 @@ executors = { 'default': AsyncIOExecutor(), } -job_defaults = { - 'coalesce': False, - 'max_instances': 3, -} def create_scheduler() -> AsyncIOScheduler: return AsyncIOScheduler( jobstores=jobstores, executors=executors, - job_defaults=job_defaults, timezone=utc, ) @@ -40,6 +35,10 @@ async def _wrapper(ctx, args) -> None: await stack.enter_async_context(in_global_transaction()) await job_config.callback_fn(ctx, args) + logger = FormattedLogger( + name=job_config.callback, + fmt=job_config.name + ': {}', + ) if job_config.crontab: trigger = CronTrigger.from_crontab(job_config.crontab) elif job_config.interval: @@ -50,7 +49,11 @@ async def _wrapper(ctx, args) -> None: name=job_name, trigger=trigger, kwargs=dict( - ctx=ctx, + ctx=JobContext( + config=ctx.config, + datasources=ctx.datasources, + logger=logger, + ), args=job_config.args, ), ) diff --git a/tests/integration_tests/test_rollback.py b/tests/integration_tests/test_rollback.py index aeee9ac25..f34ceb563 100644 --- a/tests/integration_tests/test_rollback.py +++ b/tests/integration_tests/test_rollback.py @@ -94,7 +94,7 @@ async def test_rollback_ok(self): config.database.path = ':memory:' datasource_name, datasource_config = list(config.datasources.items())[0] - datasource = TzktDatasource('test') + datasource = TzktDatasource('test', realtime=False) dipdup = DipDup(config) dipdup._datasources[datasource_name] = datasource dipdup._datasources_by_config[datasource_config] = datasource @@ -121,7 +121,7 @@ async def test_rollback_fail(self): config.database.path = ':memory:' datasource_name, datasource_config = list(config.datasources.items())[0] - datasource = TzktDatasource('test') + datasource = TzktDatasource('test', realtime=False) dipdup = DipDup(config) dipdup._datasources[datasource_name] = datasource dipdup._datasources_by_config[datasource_config] = datasource