Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
### Added

* Human-readable `CHANGELOG.md` 🕺

### Changed

* Reindex on schema hash mismatch was disabled until TimescaleDB issues won't be resolved.
* `--forbid-reindexing` option added to `dipdup run` command. When this flag is set DipDup will raise `ReindexingRequiredError` instead of truncating database when reindexing is triggered for any reason.

### Fixed

Expand Down
3 changes: 2 additions & 1 deletion src/demo_hic_et_nunc/hooks/on_rollback.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dipdup.context import HookContext
from dipdup.datasources.datasource import Datasource
from dipdup.exceptions import ReindexingReason


async def on_rollback(
Expand All @@ -9,4 +10,4 @@ async def on_rollback(
to_level: int,
) -> None:
await ctx.execute_sql('on_rollback')
await ctx.reindex(reason='reorg message received')
await ctx.reindex(ReindexingReason.ROLLBACK)
3 changes: 2 additions & 1 deletion src/demo_quipuswap/hooks/on_rollback.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dipdup.context import HookContext
from dipdup.datasources.datasource import Datasource
from dipdup.exceptions import ReindexingReason


async def on_rollback(
Expand All @@ -9,4 +10,4 @@ async def on_rollback(
to_level: int,
) -> None:
await ctx.execute_sql('on_rollback')
await ctx.reindex(reason='reorg message received')
await ctx.reindex(ReindexingReason.ROLLBACK)
3 changes: 2 additions & 1 deletion src/demo_registrydao/hooks/on_rollback.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dipdup.context import HookContext
from dipdup.datasources.datasource import Datasource
from dipdup.exceptions import ReindexingReason


async def on_rollback(
Expand All @@ -9,4 +10,4 @@ async def on_rollback(
to_level: int,
) -> None:
await ctx.execute_sql('on_rollback')
await ctx.reindex(reason='reorg message received')
await ctx.reindex(ReindexingReason.ROLLBACK)
3 changes: 2 additions & 1 deletion src/demo_tezos_domains/hooks/on_rollback.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dipdup.context import HookContext
from dipdup.datasources.datasource import Datasource
from dipdup.exceptions import ReindexingReason


async def on_rollback(
Expand All @@ -9,4 +10,4 @@ async def on_rollback(
to_level: int,
) -> None:
await ctx.execute_sql('on_rollback')
await ctx.reindex(reason='reorg message received')
await ctx.reindex(ReindexingReason.ROLLBACK)
3 changes: 2 additions & 1 deletion src/demo_tezos_domains_big_map/hooks/on_rollback.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dipdup.context import HookContext
from dipdup.datasources.datasource import Datasource
from dipdup.exceptions import ReindexingReason


async def on_rollback(
Expand All @@ -9,4 +10,4 @@ async def on_rollback(
to_level: int,
) -> None:
await ctx.execute_sql('on_rollback')
await ctx.reindex(reason='reorg message received')
await ctx.reindex(ReindexingReason.ROLLBACK)
3 changes: 2 additions & 1 deletion src/demo_tzbtc/hooks/on_rollback.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dipdup.context import HookContext
from dipdup.datasources.datasource import Datasource
from dipdup.exceptions import ReindexingReason


async def on_rollback(
Expand All @@ -9,4 +10,4 @@ async def on_rollback(
to_level: int,
) -> None:
await ctx.execute_sql('on_rollback')
await ctx.reindex(reason='reorg message received')
await ctx.reindex(ReindexingReason.ROLLBACK)
3 changes: 2 additions & 1 deletion src/demo_tzcolors/hooks/on_rollback.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dipdup.context import HookContext
from dipdup.datasources.datasource import Datasource
from dipdup.exceptions import ReindexingReason


async def on_rollback(
Expand All @@ -9,4 +10,4 @@ async def on_rollback(
to_level: int,
) -> None:
await ctx.execute_sql('on_rollback')
await ctx.reindex(reason='reorg message received')
await ctx.reindex(ReindexingReason.ROLLBACK)
6 changes: 5 additions & 1 deletion src/dipdup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from sentry_sdk.integrations.aiohttp import AioHttpIntegration
from sentry_sdk.integrations.logging import LoggingIntegration

import dipdup.context as context
from dipdup import __spec_version__, __version__, spec_reindex_mapping, spec_version_mapping
from dipdup.codegen import DEFAULT_DOCKER_ENV_FILE, DEFAULT_DOCKER_IMAGE, DEFAULT_DOCKER_TAG, DipDupCodeGenerator
from dipdup.config import DipDupConfig, LoggingConfig, PostgresDatabaseConfig
Expand Down Expand Up @@ -135,12 +136,15 @@ async def cli(ctx, config: List[str], env_file: List[str], logging_config: str):
@cli.command(help='Run indexing')
@click.option('--reindex', is_flag=True, help='Drop database and start indexing from scratch')
@click.option('--oneshot', is_flag=True, help='Synchronize indexes wia REST and exit without starting WS connection')
@click.option('--forbid-reindexing', is_flag=True, help='Raise exception instead of truncating database when reindexing is triggered')
@click.pass_context
@cli_wrapper
async def run(ctx, reindex: bool, oneshot: bool) -> None:
async def run(ctx, reindex: bool, oneshot: bool, forbid_reindexing: bool) -> None:
config: DipDupConfig = ctx.obj.config
config.initialize()
set_decimal_context(config.package)
if forbid_reindexing:
context.forbid_reindexing = True
dipdup = DipDup(config)
await dipdup.run(reindex, oneshot)

Expand Down
3 changes: 2 additions & 1 deletion src/dipdup/codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@ async def _generate_callback(self, callback_config: CallbackMixin, sql: bool = F
if sql:
code.append(f"await ctx.execute_sql('{callback_config.callback}')")
if callback_config.callback == 'on_rollback':
code.append("await ctx.reindex(reason='reorg message received')")
code.append('await ctx.reindex(ReindexingReason.ROLLBACK)')
imports.add('from dipdup.context import ReindexingReason')
else:
code.append('...')

Expand Down
21 changes: 18 additions & 3 deletions src/dipdup/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@
ContractAlreadyExistsError,
IndexAlreadyExistsError,
InitializationRequiredError,
ReindexingRequiredError,
)
from dipdup.models import Contract
from dipdup.models import Contract, ReindexingReason, Schema
from dipdup.utils import FormattedLogger, iter_files
from dipdup.utils.database import create_schema, drop_schema, move_table, truncate_schema

pending_indexes = deque() # type: ignore
forbid_reindexing = False


# TODO: Dataclasses are cool, everyone loves them. Resolve issue with pydantic serialization.
Expand Down Expand Up @@ -88,10 +90,23 @@ async def restart(self) -> None:
sys.argv.remove('--reindex')
os.execl(sys.executable, sys.executable, *sys.argv)

async def reindex(self, reason: Optional[str] = None) -> None:
async def reindex(self, reason: Optional[Union[str, ReindexingReason]] = None) -> None:
"""Drop all tables or whole database and restart with the same CLI arguments"""
reason_str = reason.value if isinstance(reason, ReindexingReason) else 'unknown'
self.logger.warning('Reindexing initialized, reason: %s', reason_str)

if not reason or isinstance(reason, str):
reason = ReindexingReason.MANUAL

if forbid_reindexing:
schema = await Schema.filter().get()
if schema.reindex:
raise ReindexingRequiredError(schema.reindex)

schema.reindex = reason
await schema.save()
raise ReindexingRequiredError(schema.reindex)

self.logger.warning('Reindexing initialized, reason: %s', reason)
database_config = self.config.database
if isinstance(database_config, PostgresDatabaseConfig):
conn = get_connection(None)
Expand Down
20 changes: 9 additions & 11 deletions src/dipdup/dipdup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from dipdup.datasources.coinbase.datasource import CoinbaseDatasource
from dipdup.datasources.datasource import Datasource, IndexDatasource
from dipdup.datasources.tzkt.datasource import TzktDatasource
from dipdup.exceptions import ConfigInitializationException, DipDupException, ReindexingRequiredError
from dipdup.exceptions import ConfigInitializationException, DipDupException, ReindexingReason
from dipdup.hasura import HasuraGateway
from dipdup.index import BigMapIndex, Index, OperationIndex
from dipdup.models import BigMapData, Contract, HeadBlockData
Expand Down Expand Up @@ -125,11 +125,11 @@ async def _load_index_states(self) -> None:
if isinstance(index_config, IndexTemplateConfig):
raise ConfigInitializationException
if index_config.hash() != index_state.config_hash:
await self._ctx.reindex(reason='config has been modified')
await self._ctx.reindex(ReindexingReason.CONFIG_HASH_MISMATCH)

elif template:
if template not in self._ctx.config.templates:
await self._ctx.reindex(reason=f'template `{template}` has been removed from config')
await self._ctx.reindex(ReindexingReason.MISSING_INDEX_TEMPLATE)
await self._ctx.add_index(name, template, template_values)

else:
Expand Down Expand Up @@ -275,8 +275,8 @@ async def _initialize_schema(self) -> None:
except OperationalError:
self._schema = None
# TODO: Fix Tortoise ORM to raise more specific exception
except KeyError as e:
raise ReindexingRequiredError from e
except KeyError:
await self._ctx.reindex(ReindexingReason.SCHEMA_HASH_MISMATCH)

schema_hash = get_schema_hash(conn)

Expand All @@ -290,13 +290,11 @@ async def _initialize_schema(self) -> None:
)
try:
await self._schema.save()
except OperationalError as e:
raise ReindexingRequiredError from e
except OperationalError:
await self._ctx.reindex(ReindexingReason.SCHEMA_HASH_MISMATCH)

elif self._schema.hash != schema_hash:
# FIXME: It seems like this check is broken in some cases
# await self._ctx.reindex(reason='schema hash mismatch')
self._logger.error('Schema hash mismatch, reindex may be required')
await self._ctx.reindex(ReindexingReason.SCHEMA_HASH_MISMATCH)

await self._ctx.fire_hook('on_restart')

Expand All @@ -310,7 +308,7 @@ async def _set_up_database(self, stack: AsyncExitStack, reindex: bool) -> None:
await stack.enter_async_context(tortoise_wrapper(url, models, timeout or 60))

if reindex:
await self._ctx.reindex(reason='run with `--reindex` option')
await self._ctx.reindex(ReindexingReason.CLI_OPTION)

async def _set_up_hooks(self) -> None:
for hook_config in default_hooks.values():
Expand Down
26 changes: 21 additions & 5 deletions src/dipdup/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import traceback
from contextlib import contextmanager
from dataclasses import dataclass
from enum import Enum
from typing import Any, Iterator, Optional, Type

from tabulate import tabulate
Expand Down Expand Up @@ -100,6 +101,17 @@ def _help(self) -> str:
"""


class ReindexingReason(Enum):
MANUAL = 'triggered manually from callback'
MIGRATION = 'applied migration requires reindexing'
CLI_OPTION = 'run with `--reindex` option'
ROLLBACK = 'reorg message received and can\'t be processed'
CONFIG_HASH_MISMATCH = 'index config has been modified'
SCHEMA_HASH_MISMATCH = 'database schema has been modified'
BLOCK_HASH_MISMATCH = 'block hash mismatch, missed rollback when DipDup was stopped'
MISSING_INDEX_TEMPLATE = 'index template is missing, can\'t restore index state'


@dataclass(frozen=True, repr=False)
class MigrationRequiredError(DipDupError):
"""Project and DipDup spec versions don't match"""
Expand All @@ -116,7 +128,7 @@ def _help(self) -> str:
],
headers=['', 'spec_version', 'DipDup version'],
)
reindex = _tab + ReindexingRequiredError().help() if self.reindex else ''
reindex = _tab + ReindexingRequiredError(ReindexingReason.MIGRATION).help() if self.reindex else ''
return f"""
Project migration required!

Expand All @@ -133,14 +145,18 @@ def _help(self) -> str:
class ReindexingRequiredError(DipDupError):
"""Performed migration requires reindexing"""

reason: ReindexingReason

def _help(self) -> str:
return """
return f"""
Reindexing required!

Recent changes in the framework have made it necessary to reindex the project.
Reason: {self.reason.value}

You may want to backup database before proceeding. After that perform one of the following actions:

1. Optionally backup a database
2. Run `dipdup run --reindex`
* Eliminate the cause of reindexing and run `UPDATE dupdup_schema SET reindex = NULL;`
* Run `dipdup run --reindex` to truncate database and start indexing from scratch
"""


Expand Down
6 changes: 3 additions & 3 deletions src/dipdup/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
)
from dipdup.context import DipDupContext
from dipdup.datasources.tzkt.datasource import BigMapFetcher, OperationFetcher, TzktDatasource
from dipdup.exceptions import ConfigInitializationException, InvalidDataError
from dipdup.exceptions import ConfigInitializationException, InvalidDataError, ReindexingReason
from dipdup.models import BigMapData, BigMapDiff, Head, HeadBlockData
from dipdup.models import Index as IndexState
from dipdup.models import IndexStatus, OperationData, Origination, Transaction
Expand Down Expand Up @@ -75,7 +75,7 @@ async def initialize_state(self, state: Optional[IndexState] = None) -> None:

block = await self._datasource.get_block(self.state.level)
if head.hash != block.hash:
await self._ctx.reindex(reason='block hash mismatch (missed rollback while DipDup was stopped)')
await self._ctx.reindex(ReindexingReason.BLOCK_HASH_MISMATCH)

async def process(self) -> None:
# NOTE: `--oneshot` flag implied
Expand Down Expand Up @@ -227,7 +227,7 @@ async def _process_level_operations(self, level: int, operations: List[Operation
received_hashes = set([op.hash for op in operations])
reused_hashes = received_hashes & expected_hashes
if reused_hashes != expected_hashes:
await self._ctx.reindex(reason='attempted a single level rollback, but arrived block has additional transactions')
await self._ctx.reindex(ReindexingReason.ROLLBACK)

self._rollback_level = None
self._last_hashes = set()
Expand Down
3 changes: 2 additions & 1 deletion src/dipdup/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from pydantic.error_wrappers import ValidationError
from tortoise import Model, fields

from dipdup.exceptions import ConfigurationError, DipDupException, InvalidDataError
from dipdup.exceptions import ConfigurationError, DipDupException, InvalidDataError, ReindexingReason

ParameterType = TypeVar('ParameterType', bound=BaseModel)
StorageType = TypeVar('StorageType', bound=BaseModel)
Expand Down Expand Up @@ -267,6 +267,7 @@ class QuoteData:
class Schema(Model):
name = fields.CharField(256, pk=True)
hash = fields.CharField(256)
reindex = fields.CharEnumField(ReindexingReason, null=True)

created_at = fields.DatetimeField(auto_now_add=True)
updated_at = fields.DatetimeField(auto_now=True)
Expand Down