Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/dipdup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,8 @@ def parent(self) -> Optional[T]:

@parent.setter
def parent(self, config: T) -> None:
if self._parent:
raise RuntimeError("Can't unset parent once set")
if self._parent and config is not self._parent:
raise ConfigInitializationException
self._parent = config


Expand Down Expand Up @@ -525,6 +525,8 @@ def callback_fn(self, fn: Callable) -> None:
self._callback_fn = fn

def initialize_callback_fn(self, package: str):
if self._callback_fn:
return
_logger.debug('Registering %s callback `%s`', self.kind, self.callback)
module_name = f'{package}.{self.kind}s.{self.callback}'
fn_name = self.callback
Expand Down Expand Up @@ -1122,6 +1124,7 @@ def _resolve_index_links(self, index_config: IndexConfigT) -> None:
index_config.datasource = self.get_tzkt_datasource(index_config.datasource)

for handler in index_config.handlers:
handler.parent = index_config
# TODO: Verify callback uniqueness
# self._callback_patterns[handler.callback].append(handler.pattern)
if isinstance(handler.contract, str):
Expand Down
100 changes: 64 additions & 36 deletions src/dipdup/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from contextlib import contextmanager, suppress
from os.path import exists, join
from pprint import pformat
from typing import Any, Dict, Iterator, Optional, Union, cast
from typing import Any, Dict, Iterator, Optional, Tuple, Union, cast

import sqlparse # type: ignore
from tortoise import Tortoise
Expand All @@ -19,7 +19,6 @@
DipDupConfig,
HandlerConfig,
HookConfig,
IndexConfig,
IndexTemplateConfig,
OperationIndexConfig,
PostgresDatabaseConfig,
Expand Down Expand Up @@ -59,11 +58,25 @@ def __init__(
def __str__(self) -> str:
return pformat(self.__dict__)

async def fire_hook(self, name: str, fmt: Optional[str] = None, *args, **kwargs: Any) -> None:
async def fire_hook(
self,
name: str,
fmt: Optional[str] = None,
*args,
**kwargs: Any,
) -> None:
await self.callbacks.fire_hook(self, name, fmt, *args, **kwargs)

async def fire_handler(self, name: str, datasource: Datasource, fmt: Optional[str] = None, *args, **kwargs: Any) -> None:
await self.callbacks.fire_handler(self, name, datasource, fmt, *args, **kwargs)
async def fire_handler(
self,
name: str,
index: str,
datasource: Datasource,
fmt: Optional[str] = None,
*args,
**kwargs: Any,
) -> None:
await self.callbacks.fire_handler(self, name, index, datasource, fmt, *args, **kwargs)

async def execute_sql(self, name: str) -> None:
await self.callbacks.execute_sql(self, name)
Expand Down Expand Up @@ -200,50 +213,54 @@ def __init__(
self.logger = logger
self.handler_config = handler_config
self.datasource = datasource
template_values = cast(IndexConfig, handler_config.parent).template_values if handler_config.parent else {}
template_values = handler_config.parent.template_values if handler_config.parent else {}
self.template_values = TemplateValuesDict(self, **template_values)


class CallbackManager:
def __init__(self, package: str) -> None:
self._logger = logging.getLogger('dipdup.callback')
self._package = package
self._handlers: Dict[str, HandlerConfig] = {}
self._handlers: Dict[Tuple[str, str], HandlerConfig] = {}
self._hooks: Dict[str, HookConfig] = {}

def register_handler(self, handler_config: HandlerConfig) -> None:
if handler_config.callback not in self._handlers:
self._handlers[handler_config.callback] = handler_config
if not handler_config.parent:
raise RuntimeError('Handler must have a parent index')

# NOTE: Same handlers can be linked to different indexes, we need to use exact config
key = (handler_config.callback, handler_config.parent.name)
if key not in self._handlers:
self._handlers[key] = handler_config
handler_config.initialize_callback_fn(self._package)

def register_hook(self, hook_config: HookConfig) -> None:
if hook_config.callback not in self._hooks:
self._hooks[hook_config.callback] = hook_config
key = hook_config.callback
if key not in self._hooks:
self._hooks[key] = hook_config
hook_config.initialize_callback_fn(self._package)

async def fire_handler(
self,
ctx: 'DipDupContext',
name: str,
index: str,
datasource: Datasource,
fmt: Optional[str] = None,
*args,
**kwargs: Any,
) -> None:
try:
new_ctx = HandlerContext(
datasources=ctx.datasources,
config=ctx.config,
callbacks=ctx.callbacks,
logger=FormattedLogger(f'dipdup.handlers.{name}', fmt),
handler_config=self._handlers[name],
datasource=datasource,
)
except KeyError as e:
raise ConfigurationError(f'Attempt to fire unregistered handler `{name}`') from e

handler_config = self._get_handler(name, index)
new_ctx = HandlerContext(
datasources=ctx.datasources,
config=ctx.config,
callbacks=ctx.callbacks,
logger=FormattedLogger(f'dipdup.handlers.{name}', fmt),
handler_config=handler_config,
datasource=datasource,
)
with self._wrapper('handler', name):
await new_ctx.handler_config.callback_fn(new_ctx, *args, **kwargs)
await handler_config.callback_fn(new_ctx, *args, **kwargs)

async def fire_hook(
self,
Expand All @@ -253,20 +270,18 @@ async def fire_hook(
*args,
**kwargs: Any,
) -> None:
try:
ctx = HookContext(
datasources=ctx.datasources,
config=ctx.config,
callbacks=ctx.callbacks,
logger=FormattedLogger(f'dipdup.hooks.{name}', fmt),
hook_config=self._hooks[name],
)
except KeyError as e:
raise ConfigurationError(f'Attempt to fire unregistered hook `{name}`') from e
hook_config = self._get_hook(name)
new_ctx = HookContext(
datasources=ctx.datasources,
config=ctx.config,
callbacks=ctx.callbacks,
logger=FormattedLogger(f'dipdup.hooks.{name}', fmt),
hook_config=hook_config,
)

self._verify_arguments(ctx, *args, **kwargs)
self._verify_arguments(new_ctx, *args, **kwargs)
with self._wrapper('hook', name):
await ctx.hook_config.callback_fn(ctx, *args, **kwargs)
await hook_config.callback_fn(ctx, *args, **kwargs)

async def execute_sql(self, ctx: 'DipDupContext', name: str) -> None:
"""Execute SQL included with project"""
Expand Down Expand Up @@ -328,3 +343,16 @@ def _verify_arguments(cls, ctx: HookContext, *args, **kwargs) -> None:
type_=type(arg),
expected_type=expected_type,
)

def _get_handler(self, name: str, index: str) -> HandlerConfig:
try:
return self._handlers[(name, index)]
except KeyError as e:
raise ConfigurationError(f'Attempt to fire unregistered handler `{name}` of index `{index}`') from e

def _get_hook(self, name: str) -> HookConfig:

try:
return self._hooks[name]
except KeyError as e:
raise ConfigurationError(f'Attempt to fire unregistered hook `{name}`') from e
2 changes: 1 addition & 1 deletion src/dipdup/datasources/tzkt/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ async def _extract_message_data(self, channel: str, message: List[Any]) -> Async
if self._level and head_level < self._level:
raise RuntimeError('Received data message from level lower than current: {head_level} < {self._level}')

# TODO: State messages will be dropped from TzKT
# NOTE: State messages will be replaced with negotiation some day
if message_type == TzktMessageType.STATE:
if self._sync_level != head_level:
self._logger.info('Datasource level set to %s', head_level)
Expand Down
9 changes: 8 additions & 1 deletion src/dipdup/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
from dipdup import __version__
from dipdup.config import HTTPConfig # type: ignore

safe_exceptions = (
aiohttp.ClientConnectionError,
aiohttp.ClientConnectorError,
aiohttp.ClientResponseError,
aiohttp.ClientPayloadError,
)


class HTTPGateway(ABC):
"""Base class for datasources which connect to remote HTTP endpoints"""
Expand Down Expand Up @@ -108,7 +115,7 @@ async def _retry_request(self, method: str, url: str, weight: int = 1, **kwargs)
weight=weight,
**kwargs,
)
except (aiohttp.ClientConnectionError, aiohttp.ClientConnectorError, aiohttp.ClientResponseError) as e:
except safe_exceptions as e:
if self._config.retry_count and attempt - 1 == self._config.retry_count:
raise e

Expand Down
6 changes: 6 additions & 0 deletions src/dipdup/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ async def _on_match(
):
"""Prepare handler arguments, parse parameter and storage. Schedule callback in executor."""
self._logger.info('%s: `%s` handler matched!', operation_subgroup.hash, handler_config.callback)
if not handler_config.parent:
raise RuntimeError('Handler must have a parent')

args: List[Optional[Union[Transaction, Origination, OperationData]]] = []
for pattern_config, operation in zip(handler_config.pattern, matched_operations):
Expand Down Expand Up @@ -367,6 +369,7 @@ async def _on_match(

await self._ctx.fire_handler(
handler_config.callback,
handler_config.parent.name,
self.datasource,
operation_subgroup.hash + ': {}',
*args,
Expand Down Expand Up @@ -474,6 +477,8 @@ async def _on_match(
) -> None:
"""Prepare handler arguments, parse key and value. Schedule callback in executor."""
self._logger.info('%s: `%s` handler matched!', matched_big_map.operation_id, handler_config.callback)
if not handler_config.parent:
raise RuntimeError('Handler must have a parent')

if matched_big_map.action.has_key:
key_type = handler_config.key_type_cls
Expand Down Expand Up @@ -502,6 +507,7 @@ async def _on_match(

await self._ctx.fire_handler(
handler_config.callback,
handler_config.parent.name,
self.datasource,
# FIXME: missing `operation_id` field in API to identify operation
None,
Expand Down