Skip to content

Commit

Permalink
various fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Aug 10, 2023
1 parent 3d1c3a6 commit 0c14690
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 11 deletions.
3 changes: 2 additions & 1 deletion distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
key_split,
parse_bytes,
parse_timedelta,
stringify,
)

from distributed.core import Status
Expand Down Expand Up @@ -2371,7 +2372,7 @@ def add_new_nodes_edges(self, new, new_edges, update=False):
continue
xx = x[key]
yy = y[key]
node_key.append(escape.url_escape(key))
node_key.append(escape.url_escape(stringify(key)))
node_x.append(xx)
node_y.append(yy)
node_state.append(task.state)
Expand Down
2 changes: 1 addition & 1 deletion distributed/shuffle/_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def rearrange_by_column_p2p(
f"p2p requires all column names to be str, found: {unsupported}",
)

name = f"shuffle-p2p-{token}"
name = f"shuffle_p2p-{token}"
layer = P2PShuffleLayer(
name,
column,
Expand Down
20 changes: 14 additions & 6 deletions distributed/shuffle/tests/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import pytest

from dask.utils import key_split

pd = pytest.importorskip("pandas")
dd = pytest.importorskip("dask.dataframe")

Expand Down Expand Up @@ -272,7 +274,13 @@ async def wait_for_tasks_in_state(
raise TypeError(dask_worker)

while (
len([key for key, ts in tasks.items() if prefix in key and ts.state == state])
len(
[
key
for key, ts in tasks.items()
if prefix in key_split(key) and ts.state == state
]
)
< count
):
await asyncio.sleep(interval)
Expand Down Expand Up @@ -752,7 +760,7 @@ async def test_closed_worker_during_unpack(c, s, a, b):
)
out = dd.shuffle.shuffle(df, "x", shuffle="p2p")
x, y = c.compute([df.x.size, out.x.size])
await wait_for_tasks_in_state("shuffle-p2p", "memory", 1, b)
await wait_for_tasks_in_state("shuffle_p2p", "memory", 1, b)
await b.close()

x = await x
Expand All @@ -779,7 +787,7 @@ async def test_restarting_during_unpack_raises_killed_worker(c, s, a, b):
)
out = dd.shuffle.shuffle(df, "x", shuffle="p2p")
out = c.compute(out.x.size)
await wait_for_tasks_in_state("shuffle-p2p", "memory", 1, b)
await wait_for_tasks_in_state("shuffle_p2p", "memory", 1, b)
await b.close()

with pytest.raises(KilledWorker):
Expand All @@ -806,7 +814,7 @@ async def test_crashed_worker_during_unpack(c, s, a):
out = dd.shuffle.shuffle(df, "x", shuffle="p2p")
y = c.compute(out.x.size)

await wait_until_worker_has_tasks("shuffle-p2p", killed_worker_address, 1, s)
await wait_until_worker_has_tasks("shuffle_p2p", killed_worker_address, 1, s)
await n.process.process.kill()

y = await y
Expand Down Expand Up @@ -1171,7 +1179,7 @@ def block(df, in_event, block_event):
out = block(out, in_event, block_event)
out = c.compute(out)

await wait_until_worker_has_tasks("shuffle-p2p", n.worker_address, 1, s)
await wait_until_worker_has_tasks("shuffle_p2p", n.worker_address, 1, s)
await in_event.wait()
await n.process.process.kill()
await block_event.set()
Expand Down Expand Up @@ -1199,7 +1207,7 @@ async def test_crashed_worker_after_shuffle_persisted(c, s, a):
out = dd.shuffle.shuffle(df, "x", shuffle="p2p")
out = out.persist()

await wait_until_worker_has_tasks("shuffle-p2p", n.worker_address, 1, s)
await wait_until_worker_has_tasks("shuffle_p2p", n.worker_address, 1, s)
await out

await n.process.process.kill()
Expand Down
2 changes: 1 addition & 1 deletion distributed/shuffle/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def transition(
assert self.scheduler
if finish != "waiting":
return
if not key.startswith("shuffle-barrier-"):
if not isinstance(key, str) or not key.startswith("shuffle-barrier-"):
return
if key in self.seen:
return
Expand Down
4 changes: 2 additions & 2 deletions distributed/tests/test_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,9 @@ async def test_resources_str(c, s, a, b):
yy = y.persist(resources={"MyRes": 1})
await wait(yy)

ts_first = s.tasks[y.__dask_keys__([0])]
ts_first = s.tasks[y.__dask_keys__()[0]]
assert ts_first.resource_restrictions == {"MyRes": 1}
ts_last = s.tasks[y.__dask_keys__([-1])]
ts_last = s.tasks[y.__dask_keys__()[-1]]
assert ts_last.resource_restrictions == {"MyRes": 1}


Expand Down

0 comments on commit 0c14690

Please sign in to comment.