Skip to content

Commit

Permalink
Merge branch 'main' into private-decide-worker
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait committed Feb 26, 2024
2 parents 00e6208 + fcfa7bc commit 8aa7675
Show file tree
Hide file tree
Showing 45 changed files with 721 additions and 257 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-pre-commit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ jobs:
- uses: actions/setup-python@v5
with:
python-version: '3.9'
- uses: pre-commit/action@v3.0.0
- uses: pre-commit/action@v3.0.1
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ jobs:
# Increase this value to reset cache if
# continuous_integration/environment-${{ matrix.environment }}.yaml has not
# changed. See also same variable in .pre-commit-config.yaml
CACHE_NUMBER: 0
CACHE_NUMBER: 1
id: cache

- name: Update environment
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/update-gpuci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
regex: false

- name: Create Pull Request
uses: peter-evans/create-pull-request@v5
uses: peter-evans/create-pull-request@v6
if: ${{ env.UCX_PY_VER != env.NEW_UCX_PY_VER }} # make sure new ucx-py nightlies are available
with:
token: ${{ secrets.GITHUB_TOKEN }}
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,4 @@ repos:

# Increase this value to clear the cache on GitHub actions if nothing else in this file
# has changed. See also same variable in .github/workflows/test.yaml
# CACHE_NUMBER: 0
# CACHE_NUMBER: 1
4 changes: 2 additions & 2 deletions continuous_integration/environment-3.12.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ dependencies:
- filesystem-spec # overridden by git tip below
- gilknocker
- h5py
- ipykernel <6.22.0 # https://github.com/dask/distributed/issues/7688
- ipywidgets <8.0.5 # https://github.com/dask/distributed/issues/7688
- ipykernel
- ipywidgets
- jinja2
- locket
- msgpack-python
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/gpuci/axis.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ LINUX_VER:
- ubuntu20.04

RAPIDS_VER:
- "24.02"
- "24.04"

excludes:
4 changes: 2 additions & 2 deletions continuous_integration/scripts/test_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,9 +498,9 @@ def main(argv: list[str] | None = None) -> None:
)
overall = {name: grouped.get_group(name) for name in grouped.groups}

# Get all of the workflow run timestamps that we wound up with, which we can use
# Get all the workflow run timestamps that we wound up with, which we can use
# below to align the different groups.
times = set()
times: set = set()
for df in overall.values():
times.update(df.date.unique())

Expand Down
18 changes: 14 additions & 4 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
ensure_dict,
format_bytes,
funcname,
parse_bytes,
parse_timedelta,
shorten_traceback,
typename,
Expand Down Expand Up @@ -1576,7 +1577,7 @@ async def _handle_report(self):

breakout = False
for msg in msgs:
logger.debug("Client receives message %s", msg)
logger.debug("Client %s receives message %s", self.id, msg)

if "status" in msg and "error" in msg["status"]:
typ, exc, tb = clean_exception(**msg)
Expand Down Expand Up @@ -3095,8 +3096,12 @@ def _get_computation_code(
module_name = fr.f_back.f_globals["__name__"] # type: ignore
if module_name == "__channelexec__":
break # execnet; pytest-xdist # pragma: nocover
try:
module_name = sys.modules[module_name].__name__
except KeyError:
# Ignore pathological cases where the module name isn't in `sys.modules`
break
# Ignore IPython related wrapping functions to user code
module_name = sys.modules[module_name].__name__
if module_name.endswith("interactiveshell"):
break

Expand Down Expand Up @@ -3158,7 +3163,9 @@ def _graph_to_futures(
header, frames = serialize(ToPickle(dsk), on_error="raise")

pickled_size = sum(map(nbytes, [header] + frames))
if pickled_size > 10_000_000:
if pickled_size > parse_bytes(
dask.config.get("distributed.admin.large-graph-warning-threshold")
):
warnings.warn(
f"Sending large graph of size {format_bytes(pickled_size)}.\n"
"This may cause some slowdown.\n"
Expand Down Expand Up @@ -4924,7 +4931,10 @@ async def _register_scheduler_plugin(
)

def register_scheduler_plugin(
self, plugin: SchedulerPlugin, name: str | None = None, idempotent: bool = False
self,
plugin: SchedulerPlugin,
name: str | None = None,
idempotent: bool | None = None,
):
"""
Register a scheduler plugin.
Expand Down
25 changes: 13 additions & 12 deletions distributed/comm/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@
logger = logging.getLogger(__name__)


# Workaround for OpenSSL 1.0.2.
# Can drop with OpenSSL 1.1.1 used by Python 3.10+.
# ref: https://bugs.python.org/issue42853
if sys.version_info < (3, 10):
OPENSSL_MAX_CHUNKSIZE = 256 ** ctypes.sizeof(ctypes.c_int) // 2 - 1
else:
OPENSSL_MAX_CHUNKSIZE = 256 ** ctypes.sizeof(ctypes.c_size_t) - 1

# We must not load more than this into a buffer at a time
# It's currently unclear why that is
# see
# - https://github.com/dask/distributed/pull/5854
# - https://bugs.python.org/issue42853
# - https://github.com/dask/distributed/pull/8507

C_INT_MAX = 256 ** ctypes.sizeof(ctypes.c_int) // 2 - 1
MAX_BUFFER_SIZE = MEMORY_LIMIT / 2


Expand Down Expand Up @@ -286,8 +286,8 @@ async def write(self, msg, serializers=None, on_error="message"):
2,
range(
0,
each_frame_nbytes + OPENSSL_MAX_CHUNKSIZE,
OPENSSL_MAX_CHUNKSIZE,
each_frame_nbytes + C_INT_MAX,
C_INT_MAX,
),
):
chunk = each_frame[i:j]
Expand Down Expand Up @@ -360,7 +360,7 @@ async def read_bytes_rw(stream: IOStream, n: int) -> memoryview:

for i, j in sliding_window(
2,
range(0, n + OPENSSL_MAX_CHUNKSIZE, OPENSSL_MAX_CHUNKSIZE),
range(0, n + C_INT_MAX, C_INT_MAX),
):
chunk = buf[i:j]
actual = await stream.read_into(chunk) # type: ignore[arg-type]
Expand Down Expand Up @@ -432,7 +432,8 @@ class TLS(TCP):
A TLS-specific version of TCP.
"""

max_shard_size = min(OPENSSL_MAX_CHUNKSIZE, TCP.max_shard_size)
# Workaround for OpenSSL 1.0.2 (can drop with OpenSSL 1.1.1)
max_shard_size = min(C_INT_MAX, TCP.max_shard_size)

def _read_extra(self):
TCP._read_extra(self)
Expand Down
68 changes: 35 additions & 33 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from bokeh.themes import Theme
from bokeh.transform import cumsum, factor_cmap, linear_cmap, stack
from jinja2 import Environment, FileSystemLoader
from tlz import curry, pipe, valmap
from tlz import curry, pipe, second, valmap
from tlz.curried import concat, groupby, map
from tornado import escape

Expand Down Expand Up @@ -1523,42 +1523,44 @@ def __init__(self, scheduler, **kwargs):
def update(self):
compute_times = defaultdict(float)

for key, ts in self.scheduler.task_prefixes.items():
name = key_split(key)
for action, t in ts.all_durations.items():
for name, tp in self.scheduler.task_prefixes.items():
for action, t in tp.all_durations.items():
if action == "compute":
compute_times[name] += t

# order by largest time first
compute_times = sorted(compute_times.items(), key=lambda x: x[1], reverse=True)

# keep only time which are 2% of max or greater
if compute_times:
max_time = compute_times[0][1] * 0.02
compute_times = [(n, t) for n, t in compute_times if t > max_time]
compute_colors = list()
compute_names = list()
compute_time = list()
total_time = 0
for name, t in compute_times:
compute_names.append(name)
compute_colors.append(ts_color_of(name))
compute_time.append(t)
total_time += t

angles = [t / total_time * 2 * math.pi for t in compute_time]

self.fig.x_range.factors = compute_names

compute_result = dict(
angles=angles,
times=compute_time,
color=compute_colors,
names=compute_names,
formatted_time=[format_time(t) for t in compute_time],
)
if not compute_times:
return

update(self.compute_source, compute_result)
# order by largest time first
compute_times = sorted(compute_times.items(), key=second, reverse=True)

# Keep only times which are 2% of max or greater
max_time = compute_times[0][1] * 0.02
compute_colors = []
compute_names = []
compute_time = []
total_time = 0
for name, t in compute_times:
if t < max_time:
break
compute_names.append(name)
compute_colors.append(ts_color_of(name))
compute_time.append(t)
total_time += t

angles = [t / total_time * 2 * math.pi for t in compute_time]

self.fig.x_range.factors = compute_names

compute_result = dict(
angles=angles,
times=compute_time,
color=compute_colors,
names=compute_names,
formatted_time=[format_time(t) for t in compute_time],
)

update(self.compute_source, compute_result)


class AggregateAction(DashboardComponent):
Expand Down
11 changes: 7 additions & 4 deletions distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -1286,22 +1286,25 @@ async def test_compute_per_key(c, s, a, b):

da = pytest.importorskip("dask.array")
x = (da.ones((20, 20), chunks=(10, 10)) + 1).persist(optimize_graph=False)

await x
y = await dask.delayed(inc)(1).persist()
z = (x + x.T) - x.mean(axis=0)
zsum = z.sum()
await c.compute(zsum)

mbk.update()

# Keep only times which are 2% of max or greater.
# This means that the list of names is not stable (but max time is always preserved)
assert mbk.compute_source.data["names"]
assert set(mbk.compute_source.data["names"]).issubset(s.task_prefixes)
assert "angles" in mbk.compute_source.data

http_client = AsyncHTTPClient()
response = await http_client.fetch(
"http://localhost:%d/individual-compute-time-per-key" % s.http_server.port
)
assert response.code == 200
assert ("sum-aggregate") in mbk.compute_source.data["names"]
assert ("add") in mbk.compute_source.data["names"]
assert "angles" in mbk.compute_source.data.keys()


@gen_cluster(scheduler_kwargs={"http_prefix": "foo-bar", "dashboard": True})
Expand Down
2 changes: 1 addition & 1 deletion distributed/deploy/tests/test_adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ async def test_adapt_quickly():
assert len(adapt.log) == 1

# Scale up when there is plenty of available work
futures = client.map(slowinc, range(1000), delay=0.100)
futures = client.map(slowinc, range(2, 1002), delay=0.100)
while len(adapt.log) == 1:
await asyncio.sleep(0.01)
assert len(adapt.log) == 2
Expand Down
7 changes: 0 additions & 7 deletions distributed/deploy/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,6 @@ async def test_repr():
assert res == expected


@gen_test()
async def test_logs_deprecated():
async with Cluster(asynchronous=True) as cluster:
with pytest.warns(FutureWarning, match="get_logs"):
cluster.logs()


@gen_test()
async def test_cluster_wait_for_worker():
async with LocalCluster(n_workers=2, asynchronous=True) as cluster:
Expand Down
2 changes: 1 addition & 1 deletion distributed/deploy/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -1057,7 +1057,7 @@ async def test_threads_per_worker_set_to_0():
n_workers=2, processes=False, threads_per_worker=0, asynchronous=True
) as cluster:
assert len(cluster.workers) == 2
assert all(w.nthreads < CPU_COUNT for w in cluster.workers.values())
assert all(w.state.nthreads < CPU_COUNT for w in cluster.workers.values())


@pytest.mark.parametrize("temporary", [True, False])
Expand Down
10 changes: 9 additions & 1 deletion distributed/deploy/tests/test_spec_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ async def test_spec_process():


@gen_test()
async def test_logs():
async def test_get_logs():
worker = {"cls": Worker, "options": {"nthreads": 1}}
async with SpecCluster(
asynchronous=True, scheduler=scheduler, worker=worker
Expand Down Expand Up @@ -304,6 +304,14 @@ async def test_logs():
assert set(logs) == {w}


@gen_test()
async def test_logs_deprecated():
async with SpecCluster(asynchronous=True, scheduler=scheduler) as cluster:
with pytest.warns(FutureWarning, match="get_logs"):
logs = await cluster.logs()
assert logs["Scheduler"]


@gen_test()
async def test_scheduler_info():
async with SpecCluster(
Expand Down
18 changes: 12 additions & 6 deletions distributed/diagnostics/tests/test_cudf_diagnostics.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
import os

import pytest
Expand All @@ -24,22 +25,27 @@ def force_spill():

manager = get_global_manager()

# 24 bytes
# Allocate a new dataframe and trigger spilling by setting a 1 byte limit
df = cudf.DataFrame({"a": [1, 2, 3]})
manager.spill_to_device_limit(1)

return manager.spill_to_device_limit(1)
# Get bytes spilled from GPU to CPU
spill_totals, _ = get_global_manager().statistics.spill_totals[("gpu", "cpu")]
return spill_totals


@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)],
)
@pytest.mark.flaky(reruns=10, reruns_delay=5)
async def test_cudf_metrics(c, s, *workers):
w = list(s.workers.values())[0]
assert "cudf" in w.metrics
assert w.metrics["cudf"]["cudf-spilled"] == 0

await c.run(force_spill)

assert w.metrics["cudf"]["cudf-spilled"] == 24
spill_totals = (await c.run(force_spill, workers=[w.address]))[w.address]
assert spill_totals > 0
# We have to wait for the worker's metrics to update.
# TODO: avoid sleep, is it possible to wait on the next update of metrics?
await asyncio.sleep(1)
assert w.metrics["cudf"]["cudf-spilled"] == spill_totals
Loading

0 comments on commit 8aa7675

Please sign in to comment.