diff --git a/src/dipdup/cli.py b/src/dipdup/cli.py index 69918465a..fe4ad1c9d 100644 --- a/src/dipdup/cli.py +++ b/src/dipdup/cli.py @@ -1,25 +1,15 @@ import asyncio -import hashlib import logging import os -import sys from dataclasses import dataclass from functools import wraps from os.path import dirname, join -from typing import Dict import click -from tortoise import Tortoise -from tortoise.exceptions import OperationalError -from tortoise.utils import get_schema_sql -import dipdup.codegen as codegen from dipdup import __version__ -from dipdup.config import DipDupConfig, IndexTemplateConfig, LoggingConfig, PostgresDatabaseConfig, TzktDatasourceConfig -from dipdup.datasources.tzkt.datasource import TzktDatasource -from dipdup.hasura import configure_hasura -from dipdup.models import IndexType, State -from dipdup.utils import reindex, tortoise_wrapper +from dipdup.config import DipDupConfig, LoggingConfig +from dipdup.dipdup import DipDup _logger = logging.getLogger(__name__) @@ -68,59 +58,8 @@ async def cli(ctx, config: str, logging_config: str): @click_async async def run(ctx) -> None: config: DipDupConfig = ctx.obj.config - - url = config.database.connection_string - models = f'{config.package}.models' - async with tortoise_wrapper(url, models): - _logger.info('Initializing database') - - connection_name, connection = next(iter(Tortoise._connections.items())) - schema_sql = get_schema_sql(connection, False) - - if isinstance(config.database, PostgresDatabaseConfig) and config.database.schema_name: - await Tortoise._connections['default'].execute_script("CREATE SCHEMA IF NOT EXISTS {}".format(config.database.schema_name)) - await Tortoise._connections['default'].execute_script("SET search_path TO {}".format(config.database.schema_name)) - - # NOTE: Column order could differ in two generated schemas for the same models, drop commas and sort strings to eliminate this - processed_schema_sql = '\n'.join(sorted(schema_sql.replace(',', '').split('\n'))).encode() - schema_hash = hashlib.sha256(processed_schema_sql).hexdigest() - - try: - schema_state = await State.get_or_none(index_type=IndexType.schema, index_name=connection_name) - except OperationalError: - schema_state = None - - if schema_state is None: - await Tortoise.generate_schemas() - schema_state = State(index_type=IndexType.schema, index_name=connection_name, hash=schema_hash) - await schema_state.save() - elif schema_state.hash != schema_hash: - _logger.warning('Schema hash mismatch, reindexing') - await reindex() - - await config.initialize() - - _logger.info('Fetching indexer state for dapp `%s`', config.package) - datasources: Dict[TzktDatasourceConfig, TzktDatasource] = {} - - for index_name, index_config in config.indexes.items(): - assert not isinstance(index_config, IndexTemplateConfig) - _logger.info('Processing index `%s`', index_name) - if isinstance(index_config.datasource, TzktDatasourceConfig): - if index_config.tzkt_config not in datasources: - datasources[index_config.tzkt_config] = TzktDatasource(index_config.tzkt_config.url) - datasources[index_config.tzkt_config].add_index(index_config) - else: - raise NotImplementedError(f'Datasource `{index_config.datasource}` is not supported') - - _logger.info('Starting datasources') - run_tasks = [asyncio.create_task(d.start()) for d in datasources.values()] - - if config.hasura: - hasura_task = asyncio.create_task(configure_hasura(config)) - run_tasks.append(hasura_task) - - await asyncio.gather(*run_tasks) + dipdup = DipDup(config) + await dipdup.run() @cli.command(help='Initialize new dipdap') @@ -128,9 +67,5 @@ async def run(ctx) -> None: @click_async async def init(ctx): config: DipDupConfig = ctx.obj.config - - await codegen.create_package(config) - await codegen.fetch_schemas(config) - await codegen.generate_types(config) - await codegen.generate_handlers(config) - await codegen.cleanup(config) + dipdup = DipDup(config) + await dipdup.init() diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py new file mode 100644 index 000000000..5cb5960e7 --- /dev/null +++ b/src/dipdup/dipdup.py @@ -0,0 +1,85 @@ +import asyncio +import hashlib +import logging +from typing import Dict + +from tortoise import Tortoise +from tortoise.exceptions import OperationalError +from tortoise.utils import get_schema_sql + +import dipdup.codegen as codegen +from dipdup import __version__ +from dipdup.config import DipDupConfig, IndexTemplateConfig, PostgresDatabaseConfig, TzktDatasourceConfig +from dipdup.datasources.tzkt.datasource import TzktDatasource +from dipdup.hasura import configure_hasura +from dipdup.models import IndexType, State +from dipdup.utils import reindex, tortoise_wrapper + + +class DipDup: + def __init__(self, config: DipDupConfig) -> None: + self._logger = logging.getLogger(__name__) + self._config = config + + async def init(self) -> None: + await codegen.create_package(self._config) + await codegen.fetch_schemas(self._config) + await codegen.generate_types(self._config) + await codegen.generate_handlers(self._config) + await codegen.cleanup(self._config) + + async def run(self) -> None: + url = self._config.database.connection_string + models = f'{self._config.package}.models' + async with tortoise_wrapper(url, models): + await self.initialize_database() + + await self._config.initialize() + + datasources: Dict[TzktDatasourceConfig, TzktDatasource] = {} + + for index_name, index_config in self._config.indexes.items(): + assert not isinstance(index_config, IndexTemplateConfig) + self._logger.info('Processing index `%s`', index_name) + if isinstance(index_config.datasource, TzktDatasourceConfig): + if index_config.tzkt_config not in datasources: + datasources[index_config.tzkt_config] = TzktDatasource(index_config.tzkt_config.url) + datasources[index_config.tzkt_config].add_index(index_config) + else: + raise NotImplementedError(f'Datasource `{index_config.datasource}` is not supported') + + self._logger.info('Starting datasources') + run_tasks = [asyncio.create_task(d.start()) for d in datasources.values()] + + if self._config.hasura: + hasura_task = asyncio.create_task(configure_hasura(self._config)) + run_tasks.append(hasura_task) + + await asyncio.gather(*run_tasks) + + async def initialize_database(self) -> None: + self._logger.info('Initializing database') + + if isinstance(self._config.database, PostgresDatabaseConfig) and self._config.database.schema_name: + await Tortoise._connections['default'].execute_script(f"CREATE SCHEMA IF NOT EXISTS {self._config.database.schema_name}") + await Tortoise._connections['default'].execute_script(f"SET search_path TO {self._config.database.schema_name}") + + connection_name, connection = next(iter(Tortoise._connections.items())) + schema_sql = get_schema_sql(connection, False) + + # NOTE: Column order could differ in two generated schemas for the same models, drop commas and sort strings to eliminate this + processed_schema_sql = '\n'.join(sorted(schema_sql.replace(',', '').split('\n'))).encode() + schema_hash = hashlib.sha256(processed_schema_sql).hexdigest() + + try: + schema_state = await State.get_or_none(index_type=IndexType.schema, index_name=connection_name) + except OperationalError: + schema_state = None + + if schema_state is None: + await Tortoise.generate_schemas() + schema_state = State(index_type=IndexType.schema, index_name=connection_name, hash=schema_hash) + await schema_state.save() + elif schema_state.hash != schema_hash: + self._logger.warning('Schema hash mismatch, reindexing') + await reindex() diff --git a/tests/integration_tests/test_codegen.py b/tests/integration_tests/test_codegen.py index f29d4ff1d..1c471901b 100644 --- a/tests/integration_tests/test_codegen.py +++ b/tests/integration_tests/test_codegen.py @@ -5,8 +5,8 @@ from shutil import rmtree from unittest import IsolatedAsyncioTestCase -from dipdup import codegen from dipdup.config import DipDupConfig +from dipdup.dipdup import DipDup # NOTE: https://gist.github.com/breeze1990/0253cb96ce04c00cb7a67feb2221e95e @@ -40,11 +40,8 @@ async def test_codegen(self): del sys.modules[config.package] try: - await codegen.create_package(config) - await codegen.fetch_schemas(config) - await codegen.generate_types(config) - await codegen.generate_handlers(config) - await codegen.cleanup(config) + dipdup = DipDup(config) + await dipdup.init() import_submodules(config.package)