diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index 3061f8100..e2aa5a4cd 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -46,7 +46,11 @@ def __init__(self, ctx: DipDupContext) -> None: self._contracts: Set[ContractConfig] = set() self._stopped: bool = False - async def run(self, spawn_datasources_event: Optional[Event]) -> None: + async def run( + self, + spawn_datasources_event: Optional[Event], + start_scheduler_event: Optional[Event], + ) -> None: self._logger.info('Starting index dispatcher') await self._subscribe_to_datasource_events() await self._set_datasource_heads() @@ -57,34 +61,28 @@ async def run(self, spawn_datasources_event: Optional[Event]) -> None: async with slowdown(1.0): await gather(*tasks) - await self._check_states() - indexes_spawned = False with suppress(IndexError): while index := pending_indexes.popleft(): self._indexes[index._config.name] = index indexes_spawned = True if not indexes_spawned: - await self._check_states() + if self._every_index_is(IndexStatus.ONESHOT): + self.stop() - if spawn_datasources_event: + if spawn_datasources_event and not spawn_datasources_event.is_set(): spawn_datasources_event.set() + if start_scheduler_event and not start_scheduler_event.is_set(): + if self._every_index_is(IndexStatus.REALTIME): + start_scheduler_event.set() + def stop(self) -> None: self._stopped = True - async def _check_states(self) -> None: + def _every_index_is(self, status: IndexStatus) -> bool: statuses = [i.state.status for i in self._indexes.values()] - - def _every_index_is(status: IndexStatus) -> bool: - nonlocal statuses - return bool(statuses) and not bool(tuple(filter(partial(ne, status), statuses))) - - # TODO: `on_synchronized` hook? Not sure if we need it. - # if _every_index_is(IndexStatus.REALTIME): ... - - if _every_index_is(IndexStatus.ONESHOT): - self.stop() + return bool(statuses) and not bool(tuple(filter(partial(ne, status), statuses))) async def _fetch_contracts(self) -> None: """Add contracts spawned from context to config""" @@ -218,13 +216,13 @@ async def run(self, reindex: bool, oneshot: bool) -> None: spawn_datasources_event: Optional[Event] = None if not oneshot: - await self._set_up_scheduler(stack, tasks) + start_scheduler_event = await self._set_up_scheduler(stack, tasks) spawn_datasources_event = await self._spawn_datasources(tasks) for name in self._config.indexes: await self._ctx._spawn_index(name) - await self._set_up_index_dispatcher(tasks, spawn_datasources_event) + await self._set_up_index_dispatcher(tasks, spawn_datasources_event, start_scheduler_event) await gather(*tasks) @@ -327,30 +325,38 @@ async def _set_up_datasources(self, stack: AsyncExitStack) -> None: for datasource in self._datasources.values(): await stack.enter_async_context(datasource) - async def _set_up_index_dispatcher(self, tasks: Set[Task], spawn_datasources_event: Optional[Event]) -> None: + async def _set_up_index_dispatcher( + self, + tasks: Set[Task], + spawn_datasources_event: Optional[Event], + start_scheduler_event: Optional[Event], + ) -> None: index_dispatcher = IndexDispatcher(self._ctx) - tasks.add(create_task(index_dispatcher.run(spawn_datasources_event))) + tasks.add(create_task(index_dispatcher.run(spawn_datasources_event, start_scheduler_event))) async def _spawn_datasources(self, tasks: Set[Task]) -> Event: event = Event() - async def _wrapper(): - self._logger.info('Waiting for IndexDispatcher to spawn datasources') + async def _event_wrapper(): + self._logger.info('Waiting for an event to spawn datasources') await event.wait() self._logger.info('Spawning datasources') + _tasks = [create_task(d.run()) for d in self._datasources.values()] await gather(*_tasks) - tasks.add(create_task(_wrapper())) + tasks.add(create_task(_event_wrapper())) return event - async def _set_up_scheduler(self, stack: AsyncExitStack, tasks: Set[Task]) -> None: + async def _set_up_scheduler(self, stack: AsyncExitStack, tasks: Set[Task]) -> Event: job_failed = Event() + event = Event() exception: Optional[Exception] = None @asynccontextmanager async def _context(): try: + self._scheduler.start() yield finally: self._scheduler.shutdown() @@ -365,11 +371,18 @@ async def _watchdog() -> None: await job_failed.wait() raise exception # type: ignore - await stack.enter_async_context(_context()) - tasks.add(create_task(_watchdog())) + async def _event_wrapper(): + self._logger.info('Waiting for an event to start scheduler') + await event.wait() + self._logger.info('Starting scheduler') + + tasks.add(create_task(_watchdog())) - for job_config in self._config.jobs.values(): - add_job(self._ctx, self._scheduler, job_config) + for job_config in self._config.jobs.values(): + add_job(self._ctx, self._scheduler, job_config) - self._scheduler.add_listener(_hook, EVENT_JOB_ERROR) - self._scheduler.start() + self._scheduler.add_listener(_hook, EVENT_JOB_ERROR) + await stack.enter_async_context(_context()) + + tasks.add(create_task(_event_wrapper())) + return event