Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 43 additions & 30 deletions src/dipdup/dipdup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ def __init__(self, ctx: DipDupContext) -> None:
self._contracts: Set[ContractConfig] = set()
self._stopped: bool = False

async def run(self, spawn_datasources_event: Optional[Event]) -> None:
async def run(
self,
spawn_datasources_event: Optional[Event],
start_scheduler_event: Optional[Event],
) -> None:
self._logger.info('Starting index dispatcher')
await self._subscribe_to_datasource_events()
await self._set_datasource_heads()
Expand All @@ -57,34 +61,28 @@ async def run(self, spawn_datasources_event: Optional[Event]) -> None:
async with slowdown(1.0):
await gather(*tasks)

await self._check_states()

indexes_spawned = False
with suppress(IndexError):
while index := pending_indexes.popleft():
self._indexes[index._config.name] = index
indexes_spawned = True
if not indexes_spawned:
await self._check_states()
if self._every_index_is(IndexStatus.ONESHOT):
self.stop()

if spawn_datasources_event:
if spawn_datasources_event and not spawn_datasources_event.is_set():
spawn_datasources_event.set()

if start_scheduler_event and not start_scheduler_event.is_set():
if self._every_index_is(IndexStatus.REALTIME):
start_scheduler_event.set()

def stop(self) -> None:
self._stopped = True

async def _check_states(self) -> None:
def _every_index_is(self, status: IndexStatus) -> bool:
statuses = [i.state.status for i in self._indexes.values()]

def _every_index_is(status: IndexStatus) -> bool:
nonlocal statuses
return bool(statuses) and not bool(tuple(filter(partial(ne, status), statuses)))

# TODO: `on_synchronized` hook? Not sure if we need it.
# if _every_index_is(IndexStatus.REALTIME): ...

if _every_index_is(IndexStatus.ONESHOT):
self.stop()
return bool(statuses) and not bool(tuple(filter(partial(ne, status), statuses)))

async def _fetch_contracts(self) -> None:
"""Add contracts spawned from context to config"""
Expand Down Expand Up @@ -218,13 +216,13 @@ async def run(self, reindex: bool, oneshot: bool) -> None:

spawn_datasources_event: Optional[Event] = None
if not oneshot:
await self._set_up_scheduler(stack, tasks)
start_scheduler_event = await self._set_up_scheduler(stack, tasks)
spawn_datasources_event = await self._spawn_datasources(tasks)

for name in self._config.indexes:
await self._ctx._spawn_index(name)

await self._set_up_index_dispatcher(tasks, spawn_datasources_event)
await self._set_up_index_dispatcher(tasks, spawn_datasources_event, start_scheduler_event)

await gather(*tasks)

Expand Down Expand Up @@ -327,30 +325,38 @@ async def _set_up_datasources(self, stack: AsyncExitStack) -> None:
for datasource in self._datasources.values():
await stack.enter_async_context(datasource)

async def _set_up_index_dispatcher(self, tasks: Set[Task], spawn_datasources_event: Optional[Event]) -> None:
async def _set_up_index_dispatcher(
self,
tasks: Set[Task],
spawn_datasources_event: Optional[Event],
start_scheduler_event: Optional[Event],
) -> None:
index_dispatcher = IndexDispatcher(self._ctx)
tasks.add(create_task(index_dispatcher.run(spawn_datasources_event)))
tasks.add(create_task(index_dispatcher.run(spawn_datasources_event, start_scheduler_event)))

async def _spawn_datasources(self, tasks: Set[Task]) -> Event:
event = Event()

async def _wrapper():
self._logger.info('Waiting for IndexDispatcher to spawn datasources')
async def _event_wrapper():
self._logger.info('Waiting for an event to spawn datasources')
await event.wait()
self._logger.info('Spawning datasources')

_tasks = [create_task(d.run()) for d in self._datasources.values()]
await gather(*_tasks)

tasks.add(create_task(_wrapper()))
tasks.add(create_task(_event_wrapper()))
return event

async def _set_up_scheduler(self, stack: AsyncExitStack, tasks: Set[Task]) -> None:
async def _set_up_scheduler(self, stack: AsyncExitStack, tasks: Set[Task]) -> Event:
job_failed = Event()
event = Event()
exception: Optional[Exception] = None

@asynccontextmanager
async def _context():
try:
self._scheduler.start()
yield
finally:
self._scheduler.shutdown()
Expand All @@ -365,11 +371,18 @@ async def _watchdog() -> None:
await job_failed.wait()
raise exception # type: ignore

await stack.enter_async_context(_context())
tasks.add(create_task(_watchdog()))
async def _event_wrapper():
self._logger.info('Waiting for an event to start scheduler')
await event.wait()
self._logger.info('Starting scheduler')

tasks.add(create_task(_watchdog()))

for job_config in self._config.jobs.values():
add_job(self._ctx, self._scheduler, job_config)
for job_config in self._config.jobs.values():
add_job(self._ctx, self._scheduler, job_config)

self._scheduler.add_listener(_hook, EVENT_JOB_ERROR)
self._scheduler.start()
self._scheduler.add_listener(_hook, EVENT_JOB_ERROR)
await stack.enter_async_context(_context())

tasks.add(create_task(_event_wrapper()))
return event