Skip to content

Commit

Permalink
use run_and_close_tornado in the CLI and Nanny
Browse files Browse the repository at this point in the history
  • Loading branch information
graingert committed Jul 5, 2023
1 parent ab6cbd9 commit 62b9e7d
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 5 deletions.
3 changes: 2 additions & 1 deletion distributed/cli/dask_scheduler.py
Expand Up @@ -18,6 +18,7 @@
enable_proctitle_on_children,
enable_proctitle_on_current,
)
from distributed.utils import run_and_close_tornado

logger = logging.getLogger("distributed.scheduler")

Expand Down Expand Up @@ -246,7 +247,7 @@ async def wait_for_signals_and_close():
logger.info("Stopped scheduler at %r", scheduler.address)

try:
asyncio.run(run())
run_and_close_tornado(run)
finally:
logger.info("End scheduler")

Expand Down
3 changes: 2 additions & 1 deletion distributed/cli/dask_spec.py
Expand Up @@ -8,6 +8,7 @@
import yaml

from distributed.deploy.spec import run_spec
from distributed.utils import run_and_close_tornado


@click.command(name="spec", context_settings=dict(ignore_unknown_options=True))
Expand Down Expand Up @@ -39,7 +40,7 @@ async def run():
except KeyboardInterrupt:
await asyncio.gather(*(w.close() for w in servers.values()))

asyncio.run(run())
run_and_close_tornado(run)


if __name__ == "__main__":
Expand Down
4 changes: 2 additions & 2 deletions distributed/cli/dask_worker.py
Expand Up @@ -27,7 +27,7 @@
enable_proctitle_on_children,
enable_proctitle_on_current,
)
from distributed.utils import import_term, parse_ports
from distributed.utils import import_term, parse_ports, run_and_close_tornado

logger = logging.getLogger("distributed.dask_worker")

Expand Down Expand Up @@ -443,7 +443,7 @@ async def wait_for_signals_and_close():
[task.result() for task in done]

try:
asyncio.run(run())
run_and_close_tornado(run)
except (TimeoutError, asyncio.TimeoutError):
# We already log the exception in nanny / worker. Don't do it again.
if not signal_fired:
Expand Down
3 changes: 2 additions & 1 deletion distributed/nanny.py
Expand Up @@ -51,6 +51,7 @@
json_load_robust,
log_errors,
parse_ports,
run_and_close_tornado,
silence_logging_cmgr,
wait_for,
)
Expand Down Expand Up @@ -996,7 +997,7 @@ def close_stop_q() -> None:
if silence_logs:
logger.setLevel(silence_logs)

asyncio.run(run())
run_and_close_tornado(run)


def _get_env_variables(config_key: str) -> dict[str, str]:
Expand Down

0 comments on commit 62b9e7d

Please sign in to comment.