Skip to content

Commit

Permalink
Merge branch 'main' into WSMR/clustered_transfers_tests
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Jun 14, 2022
2 parents 8136bae + 344868a commit 51a3ba9
Show file tree
Hide file tree
Showing 36 changed files with 3,470 additions and 2,639 deletions.
3 changes: 2 additions & 1 deletion distributed/active_memory_manager.py
Expand Up @@ -416,8 +416,9 @@ def run(
) -> SuggestionGenerator:
"""This method is invoked by the ActiveMemoryManager every few seconds, or
whenever the user invokes ``client.amm.run_once``.
It is an iterator that must emit
:class:`~distributed.active_memory_manager.Suggestion`s:
:class:`~distributed.active_memory_manager.Suggestion` objects:
- ``Suggestion("replicate", <TaskState>)``
- ``Suggestion("replicate", <TaskState>, {subset of potential workers to replicate to})``
Expand Down
20 changes: 8 additions & 12 deletions distributed/cli/tests/test_dask_scheduler.py
Expand Up @@ -66,7 +66,7 @@ def test_no_dashboard(loop):
def test_dashboard(loop):
pytest.importorskip("bokeh")

with popen(["dask-scheduler"], flush_output=False) as proc:
with popen(["dask-scheduler"], capture_output=True) as proc:
line = wait_for_log_line(b"dashboard at", proc.stdout)
dashboard_port = int(line.decode().split(":")[-1].strip())

Expand Down Expand Up @@ -99,24 +99,22 @@ def test_dashboard_non_standard_ports(loop):
pytest.importorskip("bokeh")

with popen(
["dask-scheduler", "--port", "23448", "--dashboard-address", ":24832"],
flush_output=False,
["dask-scheduler", "--port", "3448", "--dashboard-address", ":4832"]
) as proc:
line = wait_for_log_line(b"dashboard at", proc.stdout)
with Client("127.0.0.1:23448", loop=loop) as c:
with Client("127.0.0.1:3448", loop=loop) as c:
pass

start = time()
while True:
try:
response = requests.get("http://localhost:24832/status/")
response = requests.get("http://localhost:4832/status/")
assert response.ok
break
except Exception:
sleep(0.1)
assert time() < start + 20
with pytest.raises(Exception):
requests.get("http://localhost:24832/status/")
requests.get("http://localhost:4832/status/")


@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
Expand Down Expand Up @@ -209,10 +207,8 @@ def check_pidfile(proc, pidfile):
def test_scheduler_port_zero(loop):
with tmpfile() as fn:
with popen(
["dask-scheduler", "--no-dashboard", "--scheduler-file", fn, "--port", "0"],
flush_output=False,
) as proc:
line = wait_for_log_line(b"dashboard at", proc.stdout)
["dask-scheduler", "--no-dashboard", "--scheduler-file", fn, "--port", "0"]
):
with Client(scheduler_file=fn, loop=loop) as c:
assert c.scheduler.port
assert c.scheduler.port != 8786
Expand All @@ -222,7 +218,7 @@ def test_dashboard_port_zero(loop):
pytest.importorskip("bokeh")
with popen(
["dask-scheduler", "--dashboard-address", ":0"],
flush_output=False,
capture_output=True,
) as proc:
line = wait_for_log_line(b"dashboard at", proc.stdout)
dashboard_port = int(line.decode().split(":")[-1].strip())
Expand Down
4 changes: 2 additions & 2 deletions distributed/cli/tests/test_dask_spec.py
Expand Up @@ -78,15 +78,15 @@ def test_errors():
"--spec-file",
"foo.yaml",
],
flush_output=False,
capture_output=True,
) as proc:
line = proc.stdout.readline().decode()
assert "exactly one" in line
assert "--spec" in line and "--spec-file" in line

with popen(
[sys.executable, "-m", "distributed.cli.dask_spec"],
flush_output=False,
capture_output=True,
) as proc:
line = proc.stdout.readline().decode()
assert "exactly one" in line
Expand Down
4 changes: 2 additions & 2 deletions distributed/cli/tests/test_dask_ssh.py
Expand Up @@ -23,7 +23,7 @@ def test_version_option():
def test_ssh_cli_nprocs_renamed_to_nworkers(loop):
with popen(
["dask-ssh", "--nprocs=2", "--nohost", "localhost"],
flush_output=False,
capture_output=True,
) as proc:
with Client("tcp://127.0.0.1:8786", timeout="15 seconds", loop=loop) as c:
c.wait_for_workers(2, timeout="15 seconds")
Expand All @@ -36,6 +36,6 @@ def test_ssh_cli_nprocs_renamed_to_nworkers(loop):
def test_ssh_cli_nworkers_with_nprocs_is_an_error():
with popen(
["dask-ssh", "localhost", "--nprocs=2", "--nworkers=2"],
flush_output=False,
capture_output=True,
) as proc:
wait_for_log_line(b"Both --nprocs and --nworkers", proc.stdout, max_lines=15)
14 changes: 7 additions & 7 deletions distributed/cli/tests/test_dask_worker.py
Expand Up @@ -244,7 +244,7 @@ async def test_nanny_worker_port_range_too_many_workers_raises(s):
"9686:9687",
"--no-dashboard",
],
flush_output=False,
capture_output=True,
) as worker:
wait_for_log_line(b"Not enough ports in range", worker.stdout, max_lines=100)

Expand Down Expand Up @@ -278,14 +278,14 @@ async def test_no_nanny(c, s):
async def test_reconnect_deprecated(c, s):
with popen(
["dask-worker", s.address, "--reconnect"],
flush_output=False,
capture_output=True,
) as worker:
wait_for_log_line(b"`--reconnect` option has been removed", worker.stdout)
assert worker.wait() == 1

with popen(
["dask-worker", s.address, "--no-reconnect"],
flush_output=False,
capture_output=True,
) as worker:
wait_for_log_line(b"flag is deprecated, and will be removed", worker.stdout)
await c.wait_for_workers(1)
Expand Down Expand Up @@ -361,7 +361,7 @@ def test_scheduler_address_env(loop, monkeypatch):
async def test_nworkers_requires_nanny(s):
with popen(
["dask-worker", s.address, "--nworkers=2", "--no-nanny"],
flush_output=False,
capture_output=True,
) as worker:
wait_for_log_line(b"Failed to launch worker", worker.stdout, max_lines=15)

Expand Down Expand Up @@ -400,7 +400,7 @@ async def test_nworkers_expands_name(c, s):
async def test_worker_cli_nprocs_renamed_to_nworkers(c, s):
with popen(
["dask-worker", s.address, "--nprocs=2"],
flush_output=False,
capture_output=True,
) as worker:
await c.wait_for_workers(2)
wait_for_log_line(b"renamed to --nworkers", worker.stdout, max_lines=15)
Expand All @@ -410,7 +410,7 @@ async def test_worker_cli_nprocs_renamed_to_nworkers(c, s):
async def test_worker_cli_nworkers_with_nprocs_is_an_error(s):
with popen(
["dask-worker", s.address, "--nprocs=2", "--nworkers=2"],
flush_output=False,
capture_output=True,
) as worker:
wait_for_log_line(b"Both --nprocs and --nworkers", worker.stdout, max_lines=15)

Expand Down Expand Up @@ -708,7 +708,7 @@ def test_error_during_startup(monkeypatch, nanny):
"--port",
scheduler_port,
],
flush_output=False,
capture_output=True,
) as scheduler:
start = time()
# Wait for the scheduler to be up
Expand Down
8 changes: 4 additions & 4 deletions distributed/dashboard/components/scheduler.py
Expand Up @@ -984,8 +984,8 @@ def update(self):
for ws in workers:
x_read.append(ws.metrics["read_bytes"])
x_write.append(ws.metrics["write_bytes"])
x_read_disk.append(ws.metrics["read_bytes_disk"])
x_write_disk.append(ws.metrics["write_bytes_disk"])
x_read_disk.append(ws.metrics.get("read_bytes_disk", 0))
x_write_disk.append(ws.metrics.get("write_bytes_disk", 0))

if self.scheduler.workers:
self.bandwidth.x_range.end = max(
Expand Down Expand Up @@ -1173,8 +1173,8 @@ def get_data(self):
write_bytes += ws.metrics["write_bytes"]
cpu += ws.metrics["cpu"]
memory += ws.metrics["memory"]
read_bytes_disk += ws.metrics["read_bytes_disk"]
write_bytes_disk += ws.metrics["write_bytes_disk"]
read_bytes_disk += ws.metrics.get("read_bytes_disk", 0)
write_bytes_disk += ws.metrics.get("write_bytes_disk", 0)
time += ws.metrics["time"]

result = {
Expand Down
44 changes: 28 additions & 16 deletions distributed/dashboard/tests/test_scheduler_bokeh.py
Expand Up @@ -50,6 +50,7 @@
from distributed.metrics import time
from distributed.utils import format_dashboard_link
from distributed.utils_test import dec, div, gen_cluster, get_cert, inc, slowinc
from distributed.worker import Worker

# Imported from distributed.dashboard.utils
scheduler.PROFILING = False # type: ignore
Expand Down Expand Up @@ -504,22 +505,33 @@ async def test_WorkerNetworkBandwidth(c, s, a, b):
async def test_WorkerNetworkBandwidth_metrics(c, s, a, b):
# Disable system monitor periodic callback to allow us to manually control
# when it is called below
a.periodic_callbacks["monitor"].stop()
b.periodic_callbacks["monitor"].stop()

# Update worker system monitors and send updated metrics to the scheduler
a.monitor.update()
b.monitor.update()
await asyncio.gather(a.heartbeat(), b.heartbeat())

nb = WorkerNetworkBandwidth(s)
nb.update()

for idx, ws in enumerate(s.workers.values()):
assert ws.metrics["read_bytes"] == nb.source.data["x_read"][idx]
assert ws.metrics["write_bytes"] == nb.source.data["x_write"][idx]
assert ws.metrics["read_bytes_disk"] == nb.source.data["x_read_disk"][idx]
assert ws.metrics["write_bytes_disk"] == nb.source.data["x_write_disk"][idx]
with dask.config.set({"distributed.admin.system-monitor.disk": False}):
async with Worker(s.address) as w:
a.periodic_callbacks["monitor"].stop()
b.periodic_callbacks["monitor"].stop()
w.periodic_callbacks["monitor"].stop()

# Update worker system monitors and send updated metrics to the scheduler
a.monitor.update()
b.monitor.update()
w.monitor.update()
await asyncio.gather(a.heartbeat(), b.heartbeat())
await asyncio.gather(a.heartbeat(), b.heartbeat(), w.heartbeat())

nb = WorkerNetworkBandwidth(s)
nb.update()

for idx, ws in enumerate(s.workers.values()):
assert ws.metrics["read_bytes"] == nb.source.data["x_read"][idx]
assert ws.metrics["write_bytes"] == nb.source.data["x_write"][idx]
assert (
ws.metrics.get("read_bytes_disk", 0)
== nb.source.data["x_read_disk"][idx]
)
assert (
ws.metrics.get("write_bytes_disk", 0)
== nb.source.data["x_write_disk"][idx]
)


@gen_cluster(client=True)
Expand Down
48 changes: 43 additions & 5 deletions distributed/deploy/tests/test_local.py
@@ -1,7 +1,6 @@
import asyncio
import subprocess
import sys
import unittest
from threading import Lock
from time import sleep
from urllib.parse import urlparse
Expand All @@ -15,7 +14,6 @@
from distributed import Client, LocalCluster, Nanny, Worker, get_client
from distributed.compatibility import LINUX
from distributed.core import Status
from distributed.deploy.utils_test import ClusterTest
from distributed.metrics import time
from distributed.system import MEMORY_LIMIT
from distributed.utils import TimeoutError, sync
Expand Down Expand Up @@ -170,9 +168,49 @@ def test_transports_tcp_port():
assert e.submit(inc, 4).result() == 5


class LocalTest(ClusterTest, unittest.TestCase):
Cluster = LocalCluster # type: ignore
kwargs = {"silence_logs": False, "dashboard_address": ":0", "processes": False}
def test_cores(loop):
with LocalCluster(
n_workers=2,
scheduler_port=0,
silence_logs=False,
dashboard_address=":0",
processes=False,
loop=loop,
) as cluster, Client(cluster.scheduler_address, loop=loop) as client:
client.scheduler_info()
assert len(client.nthreads()) == 2


def test_submit(loop):
with LocalCluster(
n_workers=2,
scheduler_port=0,
silence_logs=False,
dashboard_address=":0",
processes=False,
loop=loop,
) as cluster, Client(cluster.scheduler_address, loop=loop) as client:
future = client.submit(lambda x: x + 1, 1)
assert future.result() == 2


def test_context_manager(loop):
with LocalCluster(
silence_logs=False, dashboard_address=":0", processes=False, loop=loop
) as c, Client(c) as e:
assert e.nthreads()


def test_no_workers_sync(loop):
with LocalCluster(
n_workers=0,
scheduler_port=0,
silence_logs=False,
dashboard_address=":0",
processes=False,
loop=loop,
):
pass


def test_Client_with_local(loop):
Expand Down
38 changes: 0 additions & 38 deletions distributed/deploy/utils_test.py

This file was deleted.

2 changes: 1 addition & 1 deletion distributed/diagnostics/plugin.py
Expand Up @@ -334,7 +334,7 @@ def __init__(self, filepath):

async def setup(self, worker):
response = await worker.upload_file(
comm=None, filename=self.filename, data=self.data, load=True
filename=self.filename, data=self.data, load=True
)
assert len(self.data) == response["nbytes"]

Expand Down
3 changes: 3 additions & 0 deletions distributed/distributed-schema.yaml
Expand Up @@ -1039,6 +1039,9 @@ properties:
interval:
type: string
description: Polling time to query cpu/memory statistics default 500ms
disk:
type: boolean
description: Should we include disk metrics? (they can cause issues in some systems)

rmm:
type: object
Expand Down
1 change: 1 addition & 0 deletions distributed/distributed.yaml
Expand Up @@ -280,6 +280,7 @@ distributed:
pdb-on-err: False # enter debug mode on scheduling error
system-monitor:
interval: 500ms
disk: true
event-loop: tornado
rmm:
pool-size: null
10 changes: 4 additions & 6 deletions distributed/nanny.py
Expand Up @@ -809,12 +809,10 @@ def _run(
try:
os.environ.update(env)
dask.config.set(config)
try:
from dask.multiprocessing import initialize_worker_process
except ImportError: # old Dask version
pass
else:
initialize_worker_process()

from dask.multiprocessing import default_initializer

default_initializer()

if silence_logs:
logger.setLevel(silence_logs)
Expand Down

0 comments on commit 51a3ba9

Please sign in to comment.