diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index baf54146..2ae73e64 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -1,4 +1,5 @@ -from contextlib import contextmanager, suppress +from contextlib import closing, contextmanager, suppress +import asyncio import logging import math import os @@ -22,6 +23,18 @@ from distributed.scheduler import Scheduler from distributed.security import Security + +def _maybe_get_running_loop(): + try: + return asyncio.get_running_loop() + except RuntimeError: + pass + + +async def _acall(fn): + return fn() + + logger = logging.getLogger(__name__) job_parameters = """ @@ -557,7 +570,14 @@ def __init__( "-" + str(i) for i in range(self._job_kwargs["processes"]) ] - self._dummy_job # trigger property to ensure that the job is valid + def _trigger_property(): + self._dummy_job # trigger property to ensure that the job is valid + + if _maybe_get_running_loop() is not None: + _trigger_property() + else: + with closing(asyncio.new_event_loop()) as loop: + loop.run_until_complete(_acall(_trigger_property)) super().__init__( scheduler=scheduler,