Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix P2P shuffle with LocalCluster(..., processes=False) #8125

Merged
merged 8 commits into from Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions distributed/shuffle/_scheduler_plugin.py
Expand Up @@ -119,6 +119,9 @@
key: str,
Copy link
Member

@graingert graingert Aug 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the spec: type annotation incorrect? Should it be ToPickle[ShuffleSpec] | ShuffleSpec ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind adjusting this, I honestly don't care too much about typing for a case that shouldn't exist in the first place.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've adjusted typing.

worker: str,
) -> ToPickle[ShuffleRunSpec]:
# FIXME: Sometimes, this doesn't actually get pickled
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which times are sometimes? Can create an issue about this and link it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suspicion is that it's related to inproc://, but I've also seen this mentioned in other parts of the codebase, so I'm not sure about that. I was planning to investigate this soon(ish), but there are more pressing issues for now.

if isinstance(spec, ToPickle):
spec = spec.data

Check warning on line 124 in distributed/shuffle/_scheduler_plugin.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_scheduler_plugin.py#L124

Added line #L124 was not covered by tests
try:
return self.get(spec.id, worker)
except KeyError:
Expand Down
16 changes: 7 additions & 9 deletions distributed/shuffle/_worker_plugin.py
Expand Up @@ -218,7 +218,7 @@
if shuffle is None:
shuffle = await self._refresh_shuffle(
shuffle_id=spec.id,
spec=ToPickle(spec),
spec=spec,
key=key,
)

Expand All @@ -239,15 +239,15 @@
async def _refresh_shuffle(
self,
shuffle_id: ShuffleId,
spec: ToPickle,
spec: ShuffleSpec,
key: str,
) -> ShuffleRun:
...

async def _refresh_shuffle(
self,
shuffle_id: ShuffleId,
spec: ToPickle | None = None,
spec: ShuffleSpec | None = None,
key: str | None = None,
) -> ShuffleRun:
result: ShuffleRunSpec
Expand All @@ -258,14 +258,13 @@
)
else:
result = await self.worker.scheduler.shuffle_get_or_create(
spec=spec,
spec=ToPickle(spec),
key=key,
worker=self.worker.address,
)
# if result["status"] == "error":
# raise RuntimeError(result["message"])
# assert result["status"] == "OK"

# FIXME: Sometimes, this doesn't actually get pickled
if isinstance(result, ToPickle):
result = result.data

Check warning on line 267 in distributed/shuffle/_worker_plugin.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_worker_plugin.py#L267

Added line #L267 was not covered by tests
if self.closed:
raise ShuffleClosedError(f"{self} has already been closed")
if shuffle_id in self.shuffles:
Expand All @@ -287,7 +286,6 @@
extension._runs_cleanup_condition.notify_all()

self.worker._ongoing_background_tasks.call_soon(_, self, existing)

shuffle: ShuffleRun = result.spec.create_run_on_worker(
result.run_id, result.worker_for, self
)
Expand Down
24 changes: 23 additions & 1 deletion distributed/shuffle/tests/test_shuffle.py
Expand Up @@ -23,7 +23,7 @@
dd = pytest.importorskip("dask.dataframe")

import dask
from dask.distributed import Event, Nanny, Worker
from dask.distributed import Event, LocalCluster, Nanny, Worker
from dask.utils import stringify

from distributed.client import Client
Expand Down Expand Up @@ -187,6 +187,28 @@ async def test_basic_integration(c, s, a, b, lose_annotations, npartitions):
await check_scheduler_cleanup(s)


@pytest.mark.parametrize("processes", [True, False])
@gen_test()
async def test_basic_integration_local_cluster(processes):
async with LocalCluster(
n_workers=2,
processes=processes,
asynchronous=True,
dashboard_address=":0",
) as cluster:
df = dask.datasets.timeseries(
start="2000-01-01",
end="2000-01-10",
dtypes={"x": float, "y": float},
freq="10 s",
)
c = cluster.get_client()
out = dd.shuffle.shuffle(df, "x", shuffle="p2p")
x, y = c.compute([df, out])
x, y = await c.gather([x, y])
dd.assert_eq(x, y)


@pytest.mark.parametrize("npartitions", [None, 1, 20])
@gen_cluster(client=True)
async def test_shuffle_with_array_conversion(c, s, a, b, lose_annotations, npartitions):
Expand Down