From 649d2027fdee02486de54a475ae2513831307046 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Sun, 20 Jun 2021 11:04:29 +0300 Subject: [PATCH 1/5] Cron jobs WIP --- poetry.lock | 47 ++++++++++++++++++++++++++++++++++++++++- pyproject.toml | 1 + src/dipdup/config.py | 18 ++++++++++++++++ src/dipdup/dipdup.py | 11 ++++++++++ src/dipdup/scheduler.py | 34 +++++++++++++++++++++++++++++ 5 files changed, 110 insertions(+), 1 deletion(-) create mode 100644 src/dipdup/scheduler.py diff --git a/poetry.lock b/poetry.lock index db528305c..da2b813ae 100644 --- a/poetry.lock +++ b/poetry.lock @@ -63,6 +63,32 @@ category = "main" optional = false python-versions = "*" +[[package]] +name = "apscheduler" +version = "3.7.0" +description = "In-process task scheduler with Cron-like capabilities" +category = "main" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, <4" + +[package.dependencies] +pytz = "*" +six = ">=1.4.0" +tzlocal = ">=2.0,<3.0" + +[package.extras] +asyncio = ["trollius"] +doc = ["sphinx", "sphinx-rtd-theme"] +gevent = ["gevent"] +mongodb = ["pymongo (>=3.0)"] +redis = ["redis (>=3.0)"] +rethinkdb = ["rethinkdb (>=2.4.0)"] +sqlalchemy = ["sqlalchemy (>=0.8)"] +testing = ["pytest (<6)", "pytest-cov", "pytest-tornado5", "mock", "pytest-asyncio (<0.6)", "pytest-asyncio"] +tornado = ["tornado (>=4.3)"] +twisted = ["twisted"] +zookeeper = ["kazoo"] + [[package]] name = "argcomplete" version = "1.12.3" @@ -803,6 +829,17 @@ category = "main" optional = false python-versions = "*" +[[package]] +name = "tzlocal" +version = "2.1" +description = "tzinfo object for the local timezone" +category = "main" +optional = false +python-versions = "*" + +[package.dependencies] +pytz = "*" + [[package]] name = "urllib3" version = "1.26.5" @@ -847,7 +884,7 @@ multidict = ">=4.0" [metadata] lock-version = "1.1" python-versions = "^3.8" -content-hash = "71eafe3fcb2efab8e543829a272fb412822d040af19ea114f79e3ead170b4e71" +content-hash = "a14f9c21cb1efb9ce88326df8c1043baca6b812faa496094d1bdeb31203a615c" [metadata.files] aiohttp = [ @@ -905,6 +942,10 @@ appdirs = [ {file = "appdirs-1.4.4-py2.py3-none-any.whl", hash = "sha256:a841dacd6b99318a741b166adb07e19ee71a274450e68237b4650ca1055ab128"}, {file = "appdirs-1.4.4.tar.gz", hash = "sha256:7d5d0167b2b1ba821647616af46a749d1c653740dd0d2415100fe26e27afdf41"}, ] +apscheduler = [ + {file = "APScheduler-3.7.0-py2.py3-none-any.whl", hash = "sha256:c06cc796d5bb9eb3c4f77727f6223476eb67749e7eea074d1587550702a7fbe3"}, + {file = "APScheduler-3.7.0.tar.gz", hash = "sha256:1cab7f2521e107d07127b042155b632b7a1cd5e02c34be5a28ff62f77c900c6a"}, +] argcomplete = [ {file = "argcomplete-1.12.3-py2.py3-none-any.whl", hash = "sha256:291f0beca7fd49ce285d2f10e4c1c77e9460cf823eef2de54df0c0fec88b0d81"}, {file = "argcomplete-1.12.3.tar.gz", hash = "sha256:2c7dbffd8c045ea534921e63b0be6fe65e88599990d8dc408ac8c542b72a5445"}, @@ -1500,6 +1541,10 @@ typing-extensions = [ {file = "typing_extensions-3.10.0.0-py3-none-any.whl", hash = "sha256:779383f6086d90c99ae41cf0ff39aac8a7937a9283ce0a414e5dd782f4c94a84"}, {file = "typing_extensions-3.10.0.0.tar.gz", hash = "sha256:50b6f157849174217d0656f99dc82fe932884fb250826c18350e159ec6cdf342"}, ] +tzlocal = [ + {file = "tzlocal-2.1-py2.py3-none-any.whl", hash = "sha256:e2cb6c6b5b604af38597403e9852872d7f534962ae2954c7f35efcb1ccacf4a4"}, + {file = "tzlocal-2.1.tar.gz", hash = "sha256:643c97c5294aedc737780a49d9df30889321cbe1204eac2c2ec6134035a92e44"}, +] urllib3 = [ {file = "urllib3-1.26.5-py2.py3-none-any.whl", hash = "sha256:753a0374df26658f99d826cfe40394a686d05985786d946fbe4165b5148f5a7c"}, {file = "urllib3-1.26.5.tar.gz", hash = "sha256:a7acd0977125325f516bda9735fa7142b909a8d01e8b2e4c8108d0984e6e0098"}, diff --git a/pyproject.toml b/pyproject.toml index 83a5f9566..17d3bbeee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,7 @@ aiosignalrcore = "^0.9.2" fcache = "^0.4.7" click = "^8.0.1" pyee = "^8.1.0" +APScheduler = "^3.7.0" [tool.poetry.dev-dependencies] black = "^20.8b1" diff --git a/src/dipdup/config.py b/src/dipdup/config.py index 430615015..0fc34c505 100644 --- a/src/dipdup/config.py +++ b/src/dipdup/config.py @@ -623,6 +623,13 @@ def valid_url(cls, v): return v +@dataclass +class JobConfig(HandlerConfig): + crontab: str + args: Optional[Dict[str, Any]] = None + atomic: bool = False + + @dataclass class DipDupConfig: """Main dapp config @@ -646,6 +653,7 @@ class DipDupConfig: templates: Optional[Dict[str, IndexConfigTemplateT]] = None database: Union[SqliteDatabaseConfig, MySQLDatabaseConfig, PostgresDatabaseConfig] = SqliteDatabaseConfig(kind='sqlite') hasura: Optional[HasuraConfig] = None + jobs: Optional[List[JobConfig]] = None def __post_init_post_parse__(self): self._callback_patterns: Dict[str, List[Sequence[HandlerPatternConfigT]]] = defaultdict(list) @@ -840,6 +848,12 @@ def _initialize_handler_callback(self, handler_config: HandlerConfig) -> None: callback_fn = getattr(handler_module, handler_config.callback) handler_config.callback_fn = callback_fn + def _initialize_job_callback(self, job_config: JobConfig) -> None: + _logger.info('Registering job callback `%s`', job_config.callback) + job_module = importlib.import_module(f'{self.package}.jobs.{job_config.callback}') + callback_fn = getattr(job_module, job_config.callback) + job_config.callback_fn = callback_fn + def _initialize_index(self, index_name: str, index_config: IndexConfigT) -> None: if index_name in self._initialized: return @@ -897,6 +911,10 @@ def _initialize_index(self, index_name: str, index_config: IndexConfigT) -> None self._initialized.append(index_name) + def _initialize_jobs(self) -> None: + for job_config in self.jobs or (): + self._initialize_job_callback(job_config) + def initialize(self) -> None: _logger.info('Setting up handlers and types for package `%s`', self.package) diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index 3454060cd..146dd00c7 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -1,4 +1,5 @@ import asyncio +from contextlib import suppress import hashlib import logging from functools import partial @@ -34,6 +35,8 @@ from dipdup.hasura import configure_hasura from dipdup.index import BigMapIndex, HandlerContext, Index, OperationIndex from dipdup.models import BigMapData, IndexType, OperationData, State +from dipdup.scheduler import add_job, create_scheduler +from apscheduler.schedulers import SchedulerNotRunningError class IndexDispatcher: @@ -150,6 +153,7 @@ def __init__(self, config: DipDupConfig) -> None: template_values=None, ) self._index_dispatcher = IndexDispatcher(self._ctx) + self._scheduler = create_scheduler() async def init(self) -> None: """Create new or update existing dipdup project""" @@ -184,6 +188,11 @@ async def run(self, reindex: bool, oneshot: bool) -> None: if self._config.hasura: worker_tasks.append(asyncio.create_task(configure_hasura(self._config))) + if self._config.jobs: + for job_config in self._config.jobs: + add_job(self._scheduler, job_config) + await self._scheduler.start() + worker_tasks.append(asyncio.create_task(self._index_dispatcher.run(oneshot))) try: @@ -193,6 +202,8 @@ async def run(self, reindex: bool, oneshot: bool) -> None: self._logger.info('Closing datasource sessions') await asyncio.gather(*[d.close_session() for d in self._datasources.values()]) + with suppress(SchedulerNotRunningError): + await self._scheduler.shutdown(wait=True) async def migrate(self) -> None: codegen = DipDupCodeGenerator(self._config, self._datasources_by_config) diff --git a/src/dipdup/scheduler.py b/src/dipdup/scheduler.py new file mode 100644 index 000000000..2e98b0fa5 --- /dev/null +++ b/src/dipdup/scheduler.py @@ -0,0 +1,34 @@ +from pytz import utc + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.executors.asyncio import AsyncIOExecutor +from apscheduler.jobstores.memory import MemoryJobStore +from apscheduler.triggers.cron import CronTrigger +from dipdup.config import JobConfig + + +jobstores = { + 'default': MemoryJobStore() +} +executors = { + 'default': AsyncIOExecutor(), +} +job_defaults = { + 'coalesce': False, + 'max_instances': 3 +} + +def create_scheduler() -> AsyncIOScheduler: + return AsyncIOScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc,) + +def add_job(scheduler: AsyncIOScheduler, job_config: JobConfig) -> None: + if job_config.atomic: + raise NotImplementedError + trigger = CronTrigger.from_crontab(job_config.crontab) + scheduler.add_job( + func=job_config.callback_fn, + id=job_config.callback, + name=job_config.callback, + trigger=trigger, + kwargs=job_config.args, + ) From 69638c86f207d5583ea463bbd9e490e8c6e71e18 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Mon, 28 Jun 2021 13:54:34 +0300 Subject: [PATCH 2/5] WIP --- src/dipdup/codegen.py | 24 ++++++++++++++++++++++++ src/dipdup/config.py | 8 +++++++- src/dipdup/dipdup.py | 5 +++-- src/dipdup/scheduler.py | 6 +++--- src/dipdup/templates/job.py.j2 | 8 ++++++++ 5 files changed, 45 insertions(+), 6 deletions(-) create mode 100644 src/dipdup/templates/job.py.j2 diff --git a/src/dipdup/codegen.py b/src/dipdup/codegen.py index 0b83ceefe..911934e1c 100644 --- a/src/dipdup/codegen.py +++ b/src/dipdup/codegen.py @@ -80,6 +80,13 @@ async def create_package(self) -> None: with open(join(handlers_path, '__init__.py'), 'w'): pass + self._logger.info('Creating `%s.jobs` package', self._config.package) + jobs_path = join(self._config.package_path, 'jobs') + with suppress(FileExistsError): + mkdir(jobs_path) + with open(join(jobs_path, '__init__.py'), 'w'): + pass + self._logger.info('Creating `%s/sql` directory', self._config.package) sql_path = join(self._config.package_path, 'sql') with suppress(FileExistsError): @@ -315,6 +322,23 @@ async def generate_user_handlers(self) -> None: else: raise NotImplementedError(f'Index kind `{index_config.kind}` is not supported') + async def generate_jobs(self) -> None: + if not self._config.jobs: + return + + jobs_path = join(self._config.package_path, 'jobs') + with open(join(dirname(__file__), 'templates', f'job.py.j2')) as file: + job_template = Template(file.read()) + + job_callbacks = set(job_config.callback for job_config in self._config.jobs.values()) + for job_callback in job_callbacks: + self._logger.info('Generating job `%s`', job_callback) + job_code = job_template.render(job=job_callback) + job_path = join(jobs_path, f'{job_callback}.py') + if not exists(job_path): + with open(job_path, 'w') as file: + file.write(job_code) + async def cleanup(self) -> None: """Remove fetched JSONSchemas""" self._logger.info('Cleaning up') diff --git a/src/dipdup/config.py b/src/dipdup/config.py index c8560aa7b..1e0bde920 100644 --- a/src/dipdup/config.py +++ b/src/dipdup/config.py @@ -424,6 +424,10 @@ class HandlerConfig: def __post_init_post_parse__(self): self._callback_fn = None + if self.callback in (ROLLBACK_HANDLER, CONFIGURE_HANDLER): + raise ConfigurationError(f'`{self.callback}` callback name is reserved') + if self.callback and self.callback != pascal_to_snake(self.callback): + raise ConfigurationError('`callback` field must conform to snake_case naming style') @property def callback_fn(self) -> Callable: @@ -615,6 +619,7 @@ class JobConfig(HandlerConfig): atomic: bool = False +@dataclass class SentryConfig: dsn: str @@ -631,6 +636,7 @@ class DipDupConfig: :param templates: Mapping of template aliases and index templates :param database: Database config :param hasura: Hasura config + :param jobs: Mapping of job aliases and job configs :param sentry: Sentry integration config """ @@ -642,7 +648,7 @@ class DipDupConfig: templates: Optional[Dict[str, IndexConfigTemplateT]] = None database: Union[SqliteDatabaseConfig, PostgresDatabaseConfig] = SqliteDatabaseConfig(kind='sqlite') hasura: Optional[HasuraConfig] = None - jobs: Optional[List[JobConfig]] = None + jobs: Optional[Dict[str, JobConfig]] = None sentry: Optional[SentryConfig] = None def __post_init_post_parse__(self): diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index 8b2d63922..a49a5e5b1 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -163,6 +163,7 @@ async def init(self) -> None: await codegen.generate_types() await codegen.generate_default_handlers() await codegen.generate_user_handlers() + await codegen.generate_jobs() await codegen.cleanup() for datasource in self._datasources.values(): @@ -187,8 +188,8 @@ async def run(self, reindex: bool, oneshot: bool) -> None: worker_tasks.append(asyncio.create_task(configure_hasura(self._config))) if self._config.jobs: - for job_config in self._config.jobs: - add_job(self._scheduler, job_config) + for job_name, job_config in self._config.jobs.items(): + add_job(self._scheduler, job_name, job_config) await self._scheduler.start() worker_tasks.append(asyncio.create_task(self._index_dispatcher.run(oneshot))) diff --git a/src/dipdup/scheduler.py b/src/dipdup/scheduler.py index 15739ea98..6e52afbad 100644 --- a/src/dipdup/scheduler.py +++ b/src/dipdup/scheduler.py @@ -27,14 +27,14 @@ def create_scheduler() -> AsyncIOScheduler: ) -def add_job(scheduler: AsyncIOScheduler, job_config: JobConfig) -> None: +def add_job(scheduler: AsyncIOScheduler, job_name: str, job_config: JobConfig) -> None: if job_config.atomic: raise NotImplementedError trigger = CronTrigger.from_crontab(job_config.crontab) scheduler.add_job( func=job_config.callback_fn, - id=job_config.callback, - name=job_config.callback, + id=job_name, + name=job_name, trigger=trigger, kwargs=job_config.args, ) diff --git a/src/dipdup/templates/job.py.j2 b/src/dipdup/templates/job.py.j2 new file mode 100644 index 000000000..783036f8f --- /dev/null +++ b/src/dipdup/templates/job.py.j2 @@ -0,0 +1,8 @@ + +from typing import Any, Dict + +from dipdup.context import DipDupContext + + +async def {{job}}(self, ctx: DipDupContext, args: Dict[str, Any]) -> None: + ... From dff775a5c3c3dfbb4a2bda4ad3754851a225106c Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Mon, 28 Jun 2021 14:45:50 +0300 Subject: [PATCH 3/5] Fixes --- src/dipdup/config.py | 5 ++++- src/dipdup/dipdup.py | 2 +- src/dipdup/scheduler.py | 5 ++++- src/dipdup/templates/job.py.j2 | 2 +- 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/dipdup/config.py b/src/dipdup/config.py index 1e0bde920..4fda7fac9 100644 --- a/src/dipdup/config.py +++ b/src/dipdup/config.py @@ -908,7 +908,9 @@ def _initialize_index(self, index_name: str, index_config: IndexConfigT) -> None self._initialized.append(index_name) def _initialize_jobs(self) -> None: - for job_config in self.jobs or (): + if not self.jobs: + return + for job_config in self.jobs.values(): self._initialize_job_callback(job_config) def initialize(self) -> None: @@ -917,6 +919,7 @@ def initialize(self) -> None: self.pre_initialize() for index_name, index_config in self.indexes.items(): self._initialize_index(index_name, index_config) + self._initialize_jobs() @dataclass diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index a49a5e5b1..899143ad3 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -190,7 +190,7 @@ async def run(self, reindex: bool, oneshot: bool) -> None: if self._config.jobs: for job_name, job_config in self._config.jobs.items(): add_job(self._scheduler, job_name, job_config) - await self._scheduler.start() + self._scheduler.start() worker_tasks.append(asyncio.create_task(self._index_dispatcher.run(oneshot))) diff --git a/src/dipdup/scheduler.py b/src/dipdup/scheduler.py index 6e52afbad..a68acf9ea 100644 --- a/src/dipdup/scheduler.py +++ b/src/dipdup/scheduler.py @@ -36,5 +36,8 @@ def add_job(scheduler: AsyncIOScheduler, job_name: str, job_config: JobConfig) - id=job_name, name=job_name, trigger=trigger, - kwargs=job_config.args, + kwargs=dict( + ctx=None, + args=job_config.args, + ), ) diff --git a/src/dipdup/templates/job.py.j2 b/src/dipdup/templates/job.py.j2 index 783036f8f..99d3a32ae 100644 --- a/src/dipdup/templates/job.py.j2 +++ b/src/dipdup/templates/job.py.j2 @@ -4,5 +4,5 @@ from typing import Any, Dict from dipdup.context import DipDupContext -async def {{job}}(self, ctx: DipDupContext, args: Dict[str, Any]) -> None: +async def {{job}}(ctx: DipDupContext, args: Dict[str, Any]) -> None: ... From 42d7ca186a46b1d67ced31ddfb53aef79f98a978 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Mon, 28 Jun 2021 15:05:54 +0300 Subject: [PATCH 4/5] Fixes, test --- src/dipdup/codegen.py | 2 +- src/dipdup/dipdup.py | 5 +- tests/integration_tests/hic_et_nunc_job.yml | 59 +++++++++++++++++++++ tests/integration_tests/test_codegen.py | 4 +- 4 files changed, 65 insertions(+), 5 deletions(-) create mode 100644 tests/integration_tests/hic_et_nunc_job.yml diff --git a/src/dipdup/codegen.py b/src/dipdup/codegen.py index 911934e1c..c9a98006f 100644 --- a/src/dipdup/codegen.py +++ b/src/dipdup/codegen.py @@ -327,7 +327,7 @@ async def generate_jobs(self) -> None: return jobs_path = join(self._config.package_path, 'jobs') - with open(join(dirname(__file__), 'templates', f'job.py.j2')) as file: + with open(join(dirname(__file__), 'templates', 'job.py.j2')) as file: job_template = Template(file.read()) job_callbacks = set(job_config.callback for job_config in self._config.jobs.values()) diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index 899143ad3..6d02a2594 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -187,7 +187,7 @@ async def run(self, reindex: bool, oneshot: bool) -> None: if self._config.hasura: worker_tasks.append(asyncio.create_task(configure_hasura(self._config))) - if self._config.jobs: + if self._config.jobs and not oneshot: for job_name, job_config in self._config.jobs.items(): add_job(self._scheduler, job_name, job_config) self._scheduler.start() @@ -201,7 +201,8 @@ async def run(self, reindex: bool, oneshot: bool) -> None: finally: self._logger.info('Closing datasource sessions') await asyncio.gather(*[d.close_session() for d in self._datasources.values()]) - with suppress(SchedulerNotRunningError): + # FIXME: AttributeError: 'NoneType' object has no attribute 'call_soon_threadsafe' + with suppress(AttributeError, SchedulerNotRunningError): await self._scheduler.shutdown(wait=True) async def migrate(self) -> None: diff --git a/tests/integration_tests/hic_et_nunc_job.yml b/tests/integration_tests/hic_et_nunc_job.yml new file mode 100644 index 000000000..aa9762a68 --- /dev/null +++ b/tests/integration_tests/hic_et_nunc_job.yml @@ -0,0 +1,59 @@ +spec_version: 1.0 +package: demo_hic_et_nunc + +database: + kind: sqlite + path: db.sqlite3 + +contracts: + HEN_objkts: + address: ${HEN_OBJKTS:-KT1RJ6PbjHpwc3M5rw5s2Nbmefwbuwbdxton} + typename: hen_objkts + HEN_minter: + address: ${HEN_MINTER:-KT1Hkg5qeNhfwpKW4fXvq7HGZB9z2EnmCCA9} + typename: hen_minter + +datasources: + tzkt_mainnet: + kind: tzkt + url: ${TZKT_URL:-https://api.tzkt.io} + +indexes: + hen_mainnet: + kind: operation + datasource: tzkt_mainnet + contracts: + - HEN_minter + handlers: + - callback: on_mint + pattern: + - type: transaction + destination: HEN_minter + entrypoint: mint_OBJKT + - type: transaction + destination: HEN_objkts + entrypoint: mint + - callback: on_swap + pattern: + - type: transaction + destination: HEN_minter + entrypoint: swap + - callback: on_cancel_swap + pattern: + - type: transaction + destination: HEN_minter + entrypoint: cancel_swap + - callback: on_collect + pattern: + - type: transaction + destination: HEN_minter + entrypoint: collect + first_block: 1365000 + last_block: 1366000 + +jobs: + test: + callback: test_job + crontab: "* * * * *" + args: + foo: bar \ No newline at end of file diff --git a/tests/integration_tests/test_codegen.py b/tests/integration_tests/test_codegen.py index 7e957084d..ec7bbeeec 100644 --- a/tests/integration_tests/test_codegen.py +++ b/tests/integration_tests/test_codegen.py @@ -32,7 +32,7 @@ def import_submodules(package, recursive=True): class CodegenTest(IsolatedAsyncioTestCase): async def test_codegen(self): for name in [ - 'hic_et_nunc.yml', + 'hic_et_nunc_job.yml', 'quipuswap.yml', 'tzcolors.yml', 'tezos_domains_big_map.yml', @@ -41,7 +41,7 @@ async def test_codegen(self): with self.subTest(name): config_path = join(dirname(__file__), name) config = DipDupConfig.load([config_path]) - config.initialize() + config.pre_initialize() config.package = 'tmp_test_dipdup' if config.package in sys.modules: From d9dec5bf20f769ee1b0522ec676e296251027739 Mon Sep 17 00:00:00 2001 From: Lev Gorodetskiy Date: Tue, 29 Jun 2021 10:54:04 +0300 Subject: [PATCH 5/5] Add atomic jobs --- src/demo_hic_et_nunc/jobs/__init__.py | 0 src/demo_quipuswap/jobs/__init__.py | 0 src/demo_registrydao/jobs/__init__.py | 0 src/demo_tezos_domains/jobs/__init__.py | 0 src/demo_tezos_domains_big_map/jobs/__init__.py | 0 src/demo_tzbtc/jobs/__init__.py | 0 src/demo_tzcolors/jobs/__init__.py | 0 src/dipdup/dipdup.py | 2 +- src/dipdup/scheduler.py | 14 +++++++++----- 9 files changed, 10 insertions(+), 6 deletions(-) create mode 100644 src/demo_hic_et_nunc/jobs/__init__.py create mode 100644 src/demo_quipuswap/jobs/__init__.py create mode 100644 src/demo_registrydao/jobs/__init__.py create mode 100644 src/demo_tezos_domains/jobs/__init__.py create mode 100644 src/demo_tezos_domains_big_map/jobs/__init__.py create mode 100644 src/demo_tzbtc/jobs/__init__.py create mode 100644 src/demo_tzcolors/jobs/__init__.py diff --git a/src/demo_hic_et_nunc/jobs/__init__.py b/src/demo_hic_et_nunc/jobs/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/demo_quipuswap/jobs/__init__.py b/src/demo_quipuswap/jobs/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/demo_registrydao/jobs/__init__.py b/src/demo_registrydao/jobs/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/demo_tezos_domains/jobs/__init__.py b/src/demo_tezos_domains/jobs/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/demo_tezos_domains_big_map/jobs/__init__.py b/src/demo_tezos_domains_big_map/jobs/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/demo_tzbtc/jobs/__init__.py b/src/demo_tzbtc/jobs/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/demo_tzcolors/jobs/__init__.py b/src/demo_tzcolors/jobs/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/dipdup/dipdup.py b/src/dipdup/dipdup.py index 9c4ed74ac..3bac76721 100644 --- a/src/dipdup/dipdup.py +++ b/src/dipdup/dipdup.py @@ -187,7 +187,7 @@ async def run(self, reindex: bool, oneshot: bool) -> None: if self._config.jobs and not oneshot: for job_name, job_config in self._config.jobs.items(): - add_job(self._scheduler, job_name, job_config) + add_job(self._ctx, self._scheduler, job_name, job_config) self._scheduler.start() worker_tasks.append(asyncio.create_task(self._index_dispatcher.run(oneshot))) diff --git a/src/dipdup/scheduler.py b/src/dipdup/scheduler.py index a68acf9ea..91ca4062c 100644 --- a/src/dipdup/scheduler.py +++ b/src/dipdup/scheduler.py @@ -5,6 +5,8 @@ from pytz import utc from dipdup.config import JobConfig +from dipdup.context import DipDupContext +from dipdup.utils import in_global_transaction jobstores = { 'default': MemoryJobStore(), @@ -27,17 +29,19 @@ def create_scheduler() -> AsyncIOScheduler: ) -def add_job(scheduler: AsyncIOScheduler, job_name: str, job_config: JobConfig) -> None: - if job_config.atomic: - raise NotImplementedError +def add_job(ctx: DipDupContext, scheduler: AsyncIOScheduler, job_name: str, job_config: JobConfig) -> None: + async def _atomic_wrapper(ctx, args): + async with in_global_transaction(): + await job_config.callback_fn(ctx, args) + trigger = CronTrigger.from_crontab(job_config.crontab) scheduler.add_job( - func=job_config.callback_fn, + func=_atomic_wrapper if job_config.atomic else job_config.callback_fn, id=job_name, name=job_name, trigger=trigger, kwargs=dict( - ctx=None, + ctx=ctx, args=job_config.args, ), )