Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 6 additions & 71 deletions src/dipdup/cli.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,15 @@
import asyncio
import hashlib
import logging
import os
import sys
from dataclasses import dataclass
from functools import wraps
from os.path import dirname, join
from typing import Dict

import click
from tortoise import Tortoise
from tortoise.exceptions import OperationalError
from tortoise.utils import get_schema_sql

import dipdup.codegen as codegen
from dipdup import __version__
from dipdup.config import DipDupConfig, IndexTemplateConfig, LoggingConfig, PostgresDatabaseConfig, TzktDatasourceConfig
from dipdup.datasources.tzkt.datasource import TzktDatasource
from dipdup.hasura import configure_hasura
from dipdup.models import IndexType, State
from dipdup.utils import reindex, tortoise_wrapper
from dipdup.config import DipDupConfig, LoggingConfig
from dipdup.dipdup import DipDup

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -68,69 +58,14 @@ async def cli(ctx, config: str, logging_config: str):
@click_async
async def run(ctx) -> None:
config: DipDupConfig = ctx.obj.config

url = config.database.connection_string
models = f'{config.package}.models'
async with tortoise_wrapper(url, models):
_logger.info('Initializing database')

connection_name, connection = next(iter(Tortoise._connections.items()))
schema_sql = get_schema_sql(connection, False)

if isinstance(config.database, PostgresDatabaseConfig) and config.database.schema_name:
await Tortoise._connections['default'].execute_script("CREATE SCHEMA IF NOT EXISTS {}".format(config.database.schema_name))
await Tortoise._connections['default'].execute_script("SET search_path TO {}".format(config.database.schema_name))

# NOTE: Column order could differ in two generated schemas for the same models, drop commas and sort strings to eliminate this
processed_schema_sql = '\n'.join(sorted(schema_sql.replace(',', '').split('\n'))).encode()
schema_hash = hashlib.sha256(processed_schema_sql).hexdigest()

try:
schema_state = await State.get_or_none(index_type=IndexType.schema, index_name=connection_name)
except OperationalError:
schema_state = None

if schema_state is None:
await Tortoise.generate_schemas()
schema_state = State(index_type=IndexType.schema, index_name=connection_name, hash=schema_hash)
await schema_state.save()
elif schema_state.hash != schema_hash:
_logger.warning('Schema hash mismatch, reindexing')
await reindex()

await config.initialize()

_logger.info('Fetching indexer state for dapp `%s`', config.package)
datasources: Dict[TzktDatasourceConfig, TzktDatasource] = {}

for index_name, index_config in config.indexes.items():
assert not isinstance(index_config, IndexTemplateConfig)
_logger.info('Processing index `%s`', index_name)
if isinstance(index_config.datasource, TzktDatasourceConfig):
if index_config.tzkt_config not in datasources:
datasources[index_config.tzkt_config] = TzktDatasource(index_config.tzkt_config.url)
datasources[index_config.tzkt_config].add_index(index_config)
else:
raise NotImplementedError(f'Datasource `{index_config.datasource}` is not supported')

_logger.info('Starting datasources')
run_tasks = [asyncio.create_task(d.start()) for d in datasources.values()]

if config.hasura:
hasura_task = asyncio.create_task(configure_hasura(config))
run_tasks.append(hasura_task)

await asyncio.gather(*run_tasks)
dipdup = DipDup(config)
await dipdup.run()


@cli.command(help='Initialize new dipdap')
@click.pass_context
@click_async
async def init(ctx):
config: DipDupConfig = ctx.obj.config

await codegen.create_package(config)
await codegen.fetch_schemas(config)
await codegen.generate_types(config)
await codegen.generate_handlers(config)
await codegen.cleanup(config)
dipdup = DipDup(config)
await dipdup.init()
85 changes: 85 additions & 0 deletions src/dipdup/dipdup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import asyncio
import hashlib
import logging
from typing import Dict

from tortoise import Tortoise
from tortoise.exceptions import OperationalError
from tortoise.utils import get_schema_sql

import dipdup.codegen as codegen
from dipdup import __version__
from dipdup.config import DipDupConfig, IndexTemplateConfig, PostgresDatabaseConfig, TzktDatasourceConfig
from dipdup.datasources.tzkt.datasource import TzktDatasource
from dipdup.hasura import configure_hasura
from dipdup.models import IndexType, State
from dipdup.utils import reindex, tortoise_wrapper


class DipDup:
def __init__(self, config: DipDupConfig) -> None:
self._logger = logging.getLogger(__name__)
self._config = config

async def init(self) -> None:
await codegen.create_package(self._config)
await codegen.fetch_schemas(self._config)
await codegen.generate_types(self._config)
await codegen.generate_handlers(self._config)
await codegen.cleanup(self._config)

async def run(self) -> None:
url = self._config.database.connection_string
models = f'{self._config.package}.models'
async with tortoise_wrapper(url, models):
await self.initialize_database()

await self._config.initialize()

datasources: Dict[TzktDatasourceConfig, TzktDatasource] = {}

for index_name, index_config in self._config.indexes.items():
assert not isinstance(index_config, IndexTemplateConfig)
self._logger.info('Processing index `%s`', index_name)
if isinstance(index_config.datasource, TzktDatasourceConfig):
if index_config.tzkt_config not in datasources:
datasources[index_config.tzkt_config] = TzktDatasource(index_config.tzkt_config.url)
datasources[index_config.tzkt_config].add_index(index_config)
else:
raise NotImplementedError(f'Datasource `{index_config.datasource}` is not supported')

self._logger.info('Starting datasources')
run_tasks = [asyncio.create_task(d.start()) for d in datasources.values()]

if self._config.hasura:
hasura_task = asyncio.create_task(configure_hasura(self._config))
run_tasks.append(hasura_task)

await asyncio.gather(*run_tasks)

async def initialize_database(self) -> None:
self._logger.info('Initializing database')

if isinstance(self._config.database, PostgresDatabaseConfig) and self._config.database.schema_name:
await Tortoise._connections['default'].execute_script(f"CREATE SCHEMA IF NOT EXISTS {self._config.database.schema_name}")
await Tortoise._connections['default'].execute_script(f"SET search_path TO {self._config.database.schema_name}")

connection_name, connection = next(iter(Tortoise._connections.items()))
schema_sql = get_schema_sql(connection, False)

# NOTE: Column order could differ in two generated schemas for the same models, drop commas and sort strings to eliminate this
processed_schema_sql = '\n'.join(sorted(schema_sql.replace(',', '').split('\n'))).encode()
schema_hash = hashlib.sha256(processed_schema_sql).hexdigest()

try:
schema_state = await State.get_or_none(index_type=IndexType.schema, index_name=connection_name)
except OperationalError:
schema_state = None

if schema_state is None:
await Tortoise.generate_schemas()
schema_state = State(index_type=IndexType.schema, index_name=connection_name, hash=schema_hash)
await schema_state.save()
elif schema_state.hash != schema_hash:
self._logger.warning('Schema hash mismatch, reindexing')
await reindex()
9 changes: 3 additions & 6 deletions tests/integration_tests/test_codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from shutil import rmtree
from unittest import IsolatedAsyncioTestCase

from dipdup import codegen
from dipdup.config import DipDupConfig
from dipdup.dipdup import DipDup


# NOTE: https://gist.github.com/breeze1990/0253cb96ce04c00cb7a67feb2221e95e
Expand Down Expand Up @@ -40,11 +40,8 @@ async def test_codegen(self):
del sys.modules[config.package]

try:
await codegen.create_package(config)
await codegen.fetch_schemas(config)
await codegen.generate_types(config)
await codegen.generate_handlers(config)
await codegen.cleanup(config)
dipdup = DipDup(config)
await dipdup.init()

import_submodules(config.package)

Expand Down