Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions distributed/deploy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
from __future__ import annotations

from contextlib import suppress

from distributed.deploy.adaptive import Adaptive
from distributed.deploy.cluster import Cluster
from distributed.deploy.local import LocalCluster
from distributed.deploy.spec import ProcessInterface, SpecCluster
from distributed.deploy.ssh import SSHCluster
from distributed.deploy.subprocess import SubprocessCluster

with suppress(ImportError):
from distributed.deploy.ssh import SSHCluster
6 changes: 1 addition & 5 deletions distributed/deploy/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from tornado.ioloop import IOLoop

import dask.config
from dask.utils import _deprecated, format_bytes, parse_timedelta, typename
from dask.utils import format_bytes, parse_timedelta, typename
from dask.widgets import get_template

from distributed.compatibility import PeriodicCallback
Expand Down Expand Up @@ -348,10 +348,6 @@ def get_logs(self, cluster=True, scheduler=True, workers=True):
self._get_logs, cluster=cluster, scheduler=scheduler, workers=workers
)

@_deprecated(use_instead="get_logs")
def logs(self, *args, **kwargs):
return self.get_logs(*args, **kwargs)

def get_client(self):
"""Return client for the cluster

Expand Down
29 changes: 10 additions & 19 deletions distributed/deploy/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ class LocalCluster(SpecCluster):
Use a falsey value like False or None for no change.
host: string
Host address on which the scheduler will listen, defaults to only localhost
ip: string
Deprecated. See ``host`` above.
dashboard_address: str
Address on which to listen for the Bokeh diagnostics server like
'localhost:8787' or '0.0.0.0:8787'. Defaults to ':8787'.
Expand All @@ -69,8 +67,6 @@ class LocalCluster(SpecCluster):
Address on which to listen for the Bokeh worker diagnostics server like
'localhost:8787' or '0.0.0.0:8787'. Defaults to None which disables the dashboard.
Use ':0' for a random port.
diagnostics_port: int
Deprecated. See dashboard_address.
asynchronous: bool (False by default)
Set to True if using this cluster within async/await functions or within
Tornado gen.coroutines. This should remain False for normal use.
Expand Down Expand Up @@ -120,14 +116,13 @@ def __init__(
threads_per_worker=None,
processes=None,
loop=None,
start=None,
start=None, # deprecated
host=None,
ip=None,
ip=None, # deprecated
scheduler_port=0,
silence_logs=logging.WARN,
dashboard_address=":8787",
worker_dashboard_address=None,
diagnostics_port=None,
services=None,
worker_services=None,
service_kwargs=None,
Expand All @@ -142,23 +137,19 @@ def __init__(
**worker_kwargs,
):
if ip is not None:
# In the future we should warn users about this move
# warnings.warn("The ip keyword has been moved to host")
host = ip

if diagnostics_port is not None:
warnings.warn(
"diagnostics_port has been deprecated. "
"Please use `dashboard_address=` instead"
"The `ip` parameter has been deprecated. Please use `host` instead",
DeprecationWarning,
stacklevel=2,
)
dashboard_address = diagnostics_port
host = ip

if threads_per_worker == 0:
if start is not None:
warnings.warn(
"Setting `threads_per_worker` to 0 has been deprecated. "
"Please set to None or to a specific int."
"The `start` parameter has been deprecated and has no effect.",
DeprecationWarning,
stacklevel=2,
)
threads_per_worker = None

if "dashboard" in worker_kwargs:
warnings.warn(
Expand Down
48 changes: 2 additions & 46 deletions distributed/deploy/old_ssh.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from __future__ import annotations

import datetime
import logging
import os
import sys
import traceback
import warnings
from queue import Queue
from threading import Thread
from time import sleep
Expand Down Expand Up @@ -330,7 +330,7 @@ def __init__(
scheduler_port,
worker_addrs,
nthreads=0,
n_workers=None,
n_workers=1,
ssh_username=None,
ssh_port=22,
ssh_private_key=None,
Expand All @@ -342,49 +342,23 @@ def __init__(
nanny_port=None,
remote_dask_worker="distributed.cli.dask_worker",
local_directory=None,
**kwargs,
):
self.scheduler_addr = scheduler_addr
self.scheduler_port = scheduler_port
self.nthreads = nthreads
nprocs = kwargs.pop("nprocs", None)
if kwargs:
raise TypeError(
f"__init__() got an unexpected keyword argument {', '.join(kwargs.keys())}"
)
if nprocs is not None and n_workers is not None:
raise ValueError(
"Both nprocs and n_workers were specified. Use n_workers only."
)
elif nprocs is not None:
warnings.warn(
"The nprocs argument will be removed in a future release. It has been "
"renamed to n_workers.",
FutureWarning,
)
n_workers = nprocs
elif n_workers is None:
n_workers = 1

self.n_workers = n_workers

self.ssh_username = ssh_username
self.ssh_port = ssh_port
self.ssh_private_key = ssh_private_key

self.nohost = nohost

self.remote_python = remote_python

self.memory_limit = memory_limit
self.worker_port = worker_port
self.nanny_port = nanny_port
self.remote_dask_worker = remote_dask_worker
self.local_directory = local_directory

# Generate a universal timestamp to use for log files
import datetime

if logdir is not None:
logdir = os.path.join(
logdir,
Expand Down Expand Up @@ -420,24 +394,6 @@ def __init__(
def _start(self):
pass

@property
def nprocs(self):
warnings.warn(
"The nprocs attribute will be removed in a future release. It has been "
"renamed to n_workers.",
FutureWarning,
)
return self.n_workers

@nprocs.setter
def nprocs(self, value):
warnings.warn(
"The nprocs attribute will be removed in a future release. It has been "
"renamed to n_workers.",
FutureWarning,
)
self.n_workers = value

@property
def scheduler_address(self):
return f"{self.scheduler_addr}:{self.scheduler_port}"
Expand Down
51 changes: 7 additions & 44 deletions distributed/deploy/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,58 +71,28 @@ def __init__( # type: ignore[no-untyped-def]
address: str,
connect_options: dict,
kwargs: dict,
worker_module="deprecated",
worker_class="distributed.Nanny",
remote_python=None,
loop=None,
name=None,
):
super().__init__()

if worker_module != "deprecated":
raise ValueError(
"worker_module has been deprecated in favor of worker_class. "
"Please specify a Python class rather than a CLI module."
if loop is not None:
warnings.warn(
"The `loop` parameter has been deprecated and has no effect.",
DeprecationWarning,
stacklevel=2,
)

self.address = address
self.scheduler = scheduler
self.worker_class = worker_class
self.connect_options = connect_options
self.kwargs = copy.copy(kwargs)
self.name = name
self.remote_python = remote_python
if kwargs.get("nprocs") is not None and kwargs.get("n_workers") is not None:
raise ValueError(
"Both nprocs and n_workers were specified. Use n_workers only."
)
elif kwargs.get("nprocs") is not None:
warnings.warn(
"The nprocs argument will be removed in a future release. It has been "
"renamed to n_workers.",
FutureWarning,
)
self.n_workers = self.kwargs.pop("nprocs", 1)
else:
self.n_workers = self.kwargs.pop("n_workers", 1)

@property
def nprocs(self):
warnings.warn(
"The nprocs attribute will be removed in a future release. It has been "
"renamed to n_workers.",
FutureWarning,
)
return self.n_workers

@nprocs.setter
def nprocs(self, value):
warnings.warn(
"The nprocs attribute will be removed in a future release. It has been "
"renamed to n_workers.",
FutureWarning,
)
self.n_workers = value
self.kwargs = copy.copy(kwargs)
self.n_workers = self.kwargs.pop("n_workers", 1)

async def start(self):
try:
Expand Down Expand Up @@ -291,7 +261,6 @@ def SSHCluster(
connect_options: dict | list[dict] | None = None,
worker_options: dict | None = None,
scheduler_options: dict | None = None,
worker_module: str = "deprecated",
worker_class: str = "distributed.Nanny",
remote_python: str | list[str] | None = None,
**kwargs: Any,
Expand Down Expand Up @@ -387,12 +356,6 @@ def SSHCluster(
worker_options = worker_options or {}
scheduler_options = scheduler_options or {}

if worker_module != "deprecated":
raise ValueError(
"worker_module has been deprecated in favor of worker_class. "
"Please specify a Python class rather than a CLI module."
)

if set(kwargs) & old_cluster_kwargs:
from distributed.deploy.old_ssh import SSHCluster as OldSSHCluster

Expand Down
16 changes: 0 additions & 16 deletions distributed/deploy/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -1060,22 +1060,6 @@ async def test_repr(memory_limit):
assert "memory" not in text


@gen_test()
async def test_threads_per_worker_set_to_0():
with pytest.warns(
Warning, match="Setting `threads_per_worker` to 0 has been deprecated."
):
async with LocalCluster(
n_workers=2,
processes=False,
threads_per_worker=0,
asynchronous=True,
dashboard_address=":0",
) as cluster:
assert len(cluster.workers) == 2
assert all(w.state.nthreads < CPU_COUNT for w in cluster.workers.values())


@pytest.mark.parametrize("temporary", [True, False])
@gen_test()
async def test_capture_security(temporary):
Expand Down
47 changes: 0 additions & 47 deletions distributed/deploy/tests/test_old_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,50 +36,3 @@ def test_cluster(loop):
while len(e.ncores()) != 3:
sleep(0.01)
assert time() < start + 5


def test_old_ssh_nprocs_renamed_to_n_workers():
with pytest.warns(FutureWarning, match="renamed to n_workers"):
with SSHCluster(
scheduler_addr="127.0.0.1",
scheduler_port=8687,
worker_addrs=["127.0.0.1", "127.0.0.1"],
nprocs=2,
) as c:
assert c.n_workers == 2


def test_nprocs_attribute_is_deprecated():
with SSHCluster(
scheduler_addr="127.0.0.1",
scheduler_port=8687,
worker_addrs=["127.0.0.1", "127.0.0.1"],
) as c:
assert c.n_workers == 1
with pytest.warns(FutureWarning, match="renamed to n_workers"):
assert c.nprocs == 1
with pytest.warns(FutureWarning, match="renamed to n_workers"):
c.nprocs = 3

assert c.n_workers == 3


def test_old_ssh_n_workers_with_nprocs_is_an_error():
with pytest.raises(ValueError, match="Both nprocs and n_workers"):
SSHCluster(
scheduler_addr="127.0.0.1",
scheduler_port=8687,
worker_addrs=(),
nprocs=2,
n_workers=2,
)


def test_extra_kwargs_is_an_error():
with pytest.raises(TypeError, match="unexpected keyword argument"):
SSHCluster(
scheduler_addr="127.0.0.1",
scheduler_port=8687,
worker_addrs=["127.0.0.1", "127.0.0.1"],
unknown_kwarg=None,
)
8 changes: 0 additions & 8 deletions distributed/deploy/tests/test_spec_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,14 +304,6 @@ async def test_get_logs():
assert set(logs) == {w}


@gen_test()
async def test_logs_deprecated():
async with SpecCluster(asynchronous=True, scheduler=scheduler) as cluster:
with pytest.warns(FutureWarning, match="get_logs"):
logs = await cluster.logs()
assert logs["Scheduler"]


@gen_test()
async def test_scheduler_info():
async with SpecCluster(
Expand Down
Loading
Loading