Skip to content

Commit

Permalink
Merge branch 'main' into drop-pkg-resources-dep
Browse files Browse the repository at this point in the history
  • Loading branch information
graingert committed Mar 15, 2022
2 parents eba9a2a + 2fffe74 commit 7f66f83
Show file tree
Hide file tree
Showing 106 changed files with 1,197 additions and 935 deletions.
6 changes: 6 additions & 0 deletions .pre-commit-config.yaml
@@ -1,4 +1,9 @@
repos:
- repo: https://github.com/MarcoGorelli/absolufy-imports
rev: v0.3.1
hooks:
- id: absolufy-imports
name: absolufy-imports
- repo: https://github.com/pycqa/isort
rev: 5.10.1
hooks:
Expand Down Expand Up @@ -41,3 +46,4 @@ repos:
- numpy
- dask
- tornado
- zict
3 changes: 2 additions & 1 deletion continuous_integration/recipes/distributed/meta.yaml
Expand Up @@ -44,7 +44,8 @@ outputs:
script: >
python -m pip install . -vv --no-deps
--install-option="--with-cython=profile" # [cython_enabled]
track_features: {{ "cythonized-scheduler" if cython_enabled else "" }}
track_features: # [cython_enabled]
- cythonized-scheduler # [cython_enabled]
entry_points:
- dask-scheduler = distributed.cli.dask_scheduler:go
- dask-ssh = distributed.cli.dask_ssh:go
Expand Down
56 changes: 32 additions & 24 deletions distributed/__init__.py
@@ -1,13 +1,13 @@
from . import config # isort:skip; load distributed configuration first
from . import widgets # isort:skip; load distributed widgets second
from distributed import config # isort:skip; load distributed configuration first
from distributed import widgets # isort:skip; load distributed widgets second


import dask
from dask.config import config # type: ignore

from ._version import get_versions
from .actor import Actor, ActorFuture, BaseActorFuture
from .client import (
from distributed._version import get_versions
from distributed.actor import Actor, ActorFuture, BaseActorFuture
from distributed.client import (
Client,
CompatibleExecutor,
Executor,
Expand All @@ -21,9 +21,9 @@
performance_report,
wait,
)
from .core import Status, connect, rpc
from .deploy import Adaptive, LocalCluster, SpecCluster, SSHCluster
from .diagnostics.plugin import (
from distributed.core import Status, connect, rpc
from distributed.deploy import Adaptive, LocalCluster, SpecCluster, SSHCluster
from distributed.diagnostics.plugin import (
Environ,
NannyPlugin,
PipInstall,
Expand All @@ -32,21 +32,29 @@
UploadFile,
WorkerPlugin,
)
from .diagnostics.progressbar import progress
from .event import Event
from .lock import Lock
from .multi_lock import MultiLock
from .nanny import Nanny
from .pubsub import Pub, Sub
from .queues import Queue
from .scheduler import Scheduler
from .security import Security
from .semaphore import Semaphore
from .threadpoolexecutor import rejoin
from .utils import CancelledError, TimeoutError, sync
from .variable import Variable
from .worker import Reschedule, Worker, get_client, get_worker, print, secede, warn
from .worker_client import local_client, worker_client
from distributed.diagnostics.progressbar import progress
from distributed.event import Event
from distributed.lock import Lock
from distributed.multi_lock import MultiLock
from distributed.nanny import Nanny
from distributed.pubsub import Pub, Sub
from distributed.queues import Queue
from distributed.scheduler import Scheduler
from distributed.security import Security
from distributed.semaphore import Semaphore
from distributed.threadpoolexecutor import rejoin
from distributed.utils import CancelledError, TimeoutError, sync
from distributed.variable import Variable
from distributed.worker import (
Reschedule,
Worker,
get_client,
get_worker,
print,
secede,
warn,
)
from distributed.worker_client import local_client, worker_client


def __getattr__(name):
Expand All @@ -59,7 +67,7 @@ def __getattr__(name):
return __version__

if name == "__git_revision__":
from ._version import get_versions
from distributed._version import get_versions

__git_revision__ = get_versions()["full-revisionid"]
return __git_revision__
Expand Down
12 changes: 6 additions & 6 deletions distributed/active_memory_manager.py
Expand Up @@ -10,13 +10,13 @@
import dask
from dask.utils import parse_timedelta

from .core import Status
from .metrics import time
from .utils import import_term, log_errors
from distributed.core import Status
from distributed.metrics import time
from distributed.utils import import_term, log_errors

if TYPE_CHECKING: # pragma: nocover
from .client import Client
from .scheduler import Scheduler, TaskState, WorkerState
if TYPE_CHECKING:
from distributed.client import Client
from distributed.scheduler import Scheduler, TaskState, WorkerState

# Main logger. This is reasonably terse also at DEBUG level.
logger = logging.getLogger(__name__)
Expand Down
10 changes: 5 additions & 5 deletions distributed/actor.py
Expand Up @@ -11,11 +11,11 @@

from tornado.ioloop import IOLoop

from .client import Future
from .protocol import to_serialize
from .utils import iscoroutinefunction, sync, thread_state
from .utils_comm import WrappedKey
from .worker import get_client, get_worker
from distributed.client import Future
from distributed.protocol import to_serialize
from distributed.utils import iscoroutinefunction, sync, thread_state
from distributed.utils_comm import WrappedKey
from distributed.worker import get_client, get_worker

_T = TypeVar("_T")

Expand Down
6 changes: 3 additions & 3 deletions distributed/batched.py
Expand Up @@ -7,8 +7,8 @@
import dask
from dask.utils import parse_timedelta

from .core import CommClosedError
from .metrics import time
from distributed.core import CommClosedError
from distributed.metrics import time

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -128,7 +128,7 @@ def _background_send(self):
self.stopped.set()
self.abort()

def send(self, *msgs):
def send(self, *msgs: dict) -> None:
"""Schedule a message for sending to the other side
This completes quickly and synchronously
Expand Down
4 changes: 2 additions & 2 deletions distributed/cfexecutor.py
Expand Up @@ -6,8 +6,8 @@

from dask.utils import parse_timedelta

from .metrics import time
from .utils import TimeoutError, sync
from distributed.metrics import time
from distributed.utils import TimeoutError, sync


@gen.coroutine
Expand Down
65 changes: 35 additions & 30 deletions distributed/client.py
Expand Up @@ -52,11 +52,11 @@
from tornado import gen
from tornado.ioloop import PeriodicCallback

from . import cluster_dump, preloading
from . import versions as version_module # type: ignore
from .batched import BatchedSend
from .cfexecutor import ClientExecutor
from .core import (
from distributed import cluster_dump, preloading
from distributed import versions as version_module # type: ignore
from distributed.batched import BatchedSend
from distributed.cfexecutor import ClientExecutor
from distributed.core import (
CommClosedError,
ConnectionPool,
PooledRPCCall,
Expand All @@ -65,17 +65,22 @@
connect,
rpc,
)
from .diagnostics.plugin import NannyPlugin, UploadFile, WorkerPlugin, _get_plugin_name
from .metrics import time
from .objects import HasWhat, SchedulerInfo, WhoHas
from .protocol import to_serialize
from .protocol.pickle import dumps, loads
from .publish import Datasets
from .pubsub import PubSubClientExtension
from .security import Security
from .sizeof import sizeof
from .threadpoolexecutor import rejoin
from .utils import (
from distributed.diagnostics.plugin import (
NannyPlugin,
UploadFile,
WorkerPlugin,
_get_plugin_name,
)
from distributed.metrics import time
from distributed.objects import HasWhat, SchedulerInfo, WhoHas
from distributed.protocol import to_serialize
from distributed.protocol.pickle import dumps, loads
from distributed.publish import Datasets
from distributed.pubsub import PubSubClientExtension
from distributed.security import Security
from distributed.sizeof import sizeof
from distributed.threadpoolexecutor import rejoin
from distributed.utils import (
All,
Any,
CancelledError,
Expand All @@ -91,15 +96,15 @@
sync,
thread_state,
)
from .utils_comm import (
from distributed.utils_comm import (
WrappedKey,
gather_from_workers,
pack_data,
retry_operation,
scatter_to_workers,
unpack_remotedata,
)
from .worker import get_client, get_worker, secede
from distributed.worker import get_client, get_worker, secede

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -1016,7 +1021,7 @@ def dashboard_link(self):
return format_dashboard_link(host, port)

def _get_scheduler_info(self):
from .scheduler import Scheduler
from distributed.scheduler import Scheduler

if (
self.cluster
Expand Down Expand Up @@ -1152,7 +1157,7 @@ async def _start(self, timeout=no_default, **kwargs):
except (ValueError, KeyError): # JSON file not yet flushed
await asyncio.sleep(0.01)
elif self._start_arg is None:
from .deploy import LocalCluster
from distributed.deploy import LocalCluster

try:
self.cluster = await LocalCluster(
Expand Down Expand Up @@ -3783,7 +3788,7 @@ async def _profile(
plot = True

if plot:
from . import profile
from distributed import profile

data = profile.plot_data(state)
figure, source = profile.plot_figure(data, sizing_mode="stretch_both")
Expand Down Expand Up @@ -4332,17 +4337,17 @@ def start_ipython_workers(
magic_names = [magic_names]

if "IPython" in sys.modules:
from ._ipython_utils import register_remote_magic
from distributed._ipython_utils import register_remote_magic

register_remote_magic()
if magic_names:
from ._ipython_utils import register_worker_magic
from distributed._ipython_utils import register_worker_magic

for worker, magic_name in zip(workers, magic_names):
connection_info = info_dict[worker]
register_worker_magic(connection_info, magic_name)
if qtconsole:
from ._ipython_utils import connect_qtconsole
from distributed._ipython_utils import connect_qtconsole

for worker, connection_info in info_dict.items():
name = "dask-" + worker.replace(":", "-").replace("/", "-")
Expand Down Expand Up @@ -4397,11 +4402,11 @@ def start_ipython_scheduler(
else:
magic_name = None
if magic_name:
from ._ipython_utils import register_worker_magic
from distributed._ipython_utils import register_worker_magic

register_worker_magic(info, magic_name)
if qtconsole:
from ._ipython_utils import connect_qtconsole
from distributed._ipython_utils import connect_qtconsole

connect_qtconsole(info, name="dask-scheduler", extra_args=qtconsole_args)
return info
Expand Down Expand Up @@ -4517,10 +4522,10 @@ async def _get_task_stream(
):
msgs = await self.scheduler.get_task_stream(start=start, stop=stop, count=count)
if plot:
from .diagnostics.task_stream import rectangles
from distributed.diagnostics.task_stream import rectangles

rects = rectangles(msgs)
from .dashboard.components.scheduler import task_stream_figure
from distributed.dashboard.components.scheduler import task_stream_figure

source, figure = task_stream_figure(sizing_mode="stretch_both")
source.data.update(rects)
Expand Down Expand Up @@ -4755,7 +4760,7 @@ def unregister_worker_plugin(self, name, nanny=None):
@property
def amm(self):
"""Convenience accessors for the :doc:`active_memory_manager`"""
from .active_memory_manager import AMMClientProxy
from distributed.active_memory_manager import AMMClientProxy

return AMMClientProxy(self)

Expand Down Expand Up @@ -4980,7 +4985,7 @@ def update(self, futures):
"""Add multiple futures to the collection.
The added futures will emit from the iterator once they finish"""
from .actor import BaseActorFuture
from distributed.actor import BaseActorFuture

with self.lock:
for f in futures:
Expand Down
16 changes: 8 additions & 8 deletions distributed/comm/__init__.py
@@ -1,4 +1,4 @@
from .addressing import (
from distributed.comm.addressing import (
get_address_host,
get_address_host_port,
get_local_address_for,
Expand All @@ -9,25 +9,25 @@
unparse_address,
unparse_host_port,
)
from .core import Comm, CommClosedError, connect, listen
from .registry import backends
from .utils import get_tcp_server_address, get_tcp_server_addresses
from distributed.comm.core import Comm, CommClosedError, connect, listen
from distributed.comm.registry import backends
from distributed.comm.utils import get_tcp_server_address, get_tcp_server_addresses


def _register_transports():
import dask.config

from . import inproc, ws
from distributed.comm import inproc, ws

tcp_backend = dask.config.get("distributed.comm.tcp.backend")

if tcp_backend == "asyncio":
from . import asyncio_tcp
from distributed.comm import asyncio_tcp

backends["tcp"] = asyncio_tcp.TCPBackend()
backends["tls"] = asyncio_tcp.TLSBackend()
elif tcp_backend == "tornado":
from . import tcp
from distributed.comm import tcp

backends["tcp"] = tcp.TCPBackend()
backends["tls"] = tcp.TLSBackend()
Expand All @@ -38,7 +38,7 @@ def _register_transports():
)

try:
from . import ucx
from distributed.comm import ucx
except ImportError:
pass

Expand Down
4 changes: 2 additions & 2 deletions distributed/comm/addressing.py
Expand Up @@ -4,8 +4,8 @@

import dask

from ..utils import get_ip_interface
from . import registry
from distributed.comm import registry
from distributed.utils import get_ip_interface


def parse_address(addr: str, strict: bool = False) -> tuple[str, str]:
Expand Down

0 comments on commit 7f66f83

Please sign in to comment.