Skip to content

Commit

Permalink
configure asyncio loop using loop_factory kwarg rather than using the…
Browse files Browse the repository at this point in the history
… set_event_loop_policy
  • Loading branch information
graingert committed Jul 5, 2023
1 parent 05f9964 commit ab6cbd9
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 12 deletions.
22 changes: 11 additions & 11 deletions distributed/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging.config
import os
import sys
from collections.abc import Callable
from typing import Any

import yaml
Expand Down Expand Up @@ -177,7 +178,7 @@ def initialize_logging(config: dict[Any, Any]) -> None:
_initialize_logging_old_style(config)


def initialize_event_loop(config: dict[Any, Any]) -> None:
def get_loop_factory() -> Callable[[], asyncio.AbstractEventLoop] | None:
event_loop = dask.config.get("distributed.admin.event-loop")
if event_loop == "uvloop":
uvloop = import_required(
Expand All @@ -189,19 +190,18 @@ def initialize_event_loop(config: dict[Any, Any]) -> None:
" conda install uvloop\n"
" pip install uvloop",
)
uvloop.install()
elif event_loop in {"asyncio", "tornado"}:
return uvloop.new_event_loop
if event_loop in {"asyncio", "tornado"}:
if sys.platform == "win32":
# WindowsProactorEventLoopPolicy is not compatible with tornado 6
# WindowsProactorEventLoop is not compatible with tornado 6
# fallback to the pre-3.8 default of Selector
# https://github.com/tornadoweb/tornado/issues/2608
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
else:
raise ValueError(
"Expected distributed.admin.event-loop to be in ('asyncio', 'tornado', 'uvloop'), got %s"
% dask.config.get("distributed.admin.event-loop")
)
return asyncio.WindowsSelectorEventLoop
return None
raise ValueError(
"Expected distributed.admin.event-loop to be in ('asyncio', 'tornado', 'uvloop'), got %s"
% dask.config.get("distributed.admin.event-loop")
)


initialize_logging(dask.config.config)
initialize_event_loop(dask.config.config)
5 changes: 4 additions & 1 deletion distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
import psutil
import tblib.pickling_support

from distributed.compatibility import asyncio_run
from distributed.config import get_loop_factory

try:
import resource
except ImportError:
Expand Down Expand Up @@ -497,7 +500,7 @@ async def inner_fn():
return await async_fn(*args, **kwargs)

try:
return asyncio.run(inner_fn())
return asyncio_run(inner_fn(), loop_factory=get_loop_factory())
finally:
if tornado_loop is not None:
tornado_loop.close(all_fds=True)
Expand Down

0 comments on commit ab6cbd9

Please sign in to comment.