diff --git a/distributed/dashboard/tests/test_scheduler_bokeh.py b/distributed/dashboard/tests/test_scheduler_bokeh.py index d0ddc21f90..3cdf8e70bf 100644 --- a/distributed/dashboard/tests/test_scheduler_bokeh.py +++ b/distributed/dashboard/tests/test_scheduler_bokeh.py @@ -1346,7 +1346,7 @@ async def test_shuffling(c, s, a, b): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - df2 = dd.shuffle.shuffle(df, "x").persist() + df2 = df.shuffle("x").persist() start = time() while not ss.source.data["comm_written"]: ss.update() diff --git a/distributed/protocol/tests/test_highlevelgraph.py b/distributed/protocol/tests/test_highlevelgraph.py index 23a5768241..33de277a53 100644 --- a/distributed/protocol/tests/test_highlevelgraph.py +++ b/distributed/protocol/tests/test_highlevelgraph.py @@ -1,5 +1,7 @@ from __future__ import annotations +import contextlib + import pytest np = pytest.importorskip("numpy") @@ -173,7 +175,13 @@ async def test_dataframe_annotations(c, s, a, b): acol = df["a"] bcol = df["b"] - with dask.annotate(retries=retries): + ctx = contextlib.nullcontext() + if dd._dask_expr_enabled(): + ctx = pytest.warns( + UserWarning, match="Annotations will be ignored when using query-planning" + ) + + with dask.annotate(retries=retries), ctx: df = acol + bcol with dask.config.set(optimization__fuse__active=False): @@ -182,5 +190,6 @@ async def test_dataframe_annotations(c, s, a, b): assert rdf.dtypes == np.float64 assert (rdf == 10.0).all() - # There is an annotation match per partition (i.e. task) - assert plugin.retry_matches == df.npartitions + if not dd._dask_expr_enabled(): + # There is an annotation match per partition (i.e. task) + assert plugin.retry_matches == df.npartitions diff --git a/distributed/shuffle/tests/test_graph.py b/distributed/shuffle/tests/test_graph.py index cbd6b696d3..7fa4d5b2ce 100644 --- a/distributed/shuffle/tests/test_graph.py +++ b/distributed/shuffle/tests/test_graph.py @@ -5,17 +5,17 @@ import pytest pd = pytest.importorskip("pandas") -pytest.importorskip("dask.dataframe") +dd = pytest.importorskip("dask.dataframe") pytest.importorskip("pyarrow") import dask -import dask.dataframe as dd from dask.blockwise import Blockwise from dask.utils_test import hlg_layer_topological from distributed.utils_test import gen_cluster +@pytest.mark.skipif(condition=dd._dask_expr_enabled(), reason="no HLG") def test_basic(client): df = dd.demo.make_timeseries(freq="15D", partition_freq="30D") df["name"] = df["name"].astype("string[python]") @@ -26,11 +26,6 @@ def test_basic(client): assert isinstance(hlg_layer_topological(opt.dask, 0), Blockwise) # blockwise -> barrier -> unpack -> drop_by_shallow_copy - with dask.config.set({"dataframe.shuffle.method": "tasks"}): - tasks_shuffled = df.shuffle("id") - dd.utils.assert_eq(p2p_shuffled, tasks_shuffled, scheduler=client) - # ^ NOTE: this works because `assert_eq` sorts the rows before comparing - @pytest.mark.parametrize("dtype", ["csingle", "cdouble", "clongdouble"]) def test_raise_on_complex_numbers(dtype): diff --git a/distributed/shuffle/tests/test_merge.py b/distributed/shuffle/tests/test_merge.py index 1f6eb2f33e..27786e963c 100644 --- a/distributed/shuffle/tests/test_merge.py +++ b/distributed/shuffle/tests/test_merge.py @@ -11,7 +11,6 @@ from distributed import Worker from distributed.shuffle._core import ShuffleId, ShuffleSpec, id_from_key -from distributed.shuffle._merge import hash_join from distributed.shuffle._worker_plugin import ShuffleRun, _ShuffleRunManager from distributed.utils_test import gen_cluster @@ -21,7 +20,8 @@ import dask from dask.dataframe._compat import PANDAS_GE_200, tm from dask.dataframe.utils import assert_eq -from dask.utils_test import hlg_layer_topological + +from distributed import get_client try: import pyarrow as pa @@ -31,15 +31,10 @@ pytestmark = pytest.mark.ci1 -def list_eq(aa, bb): - if isinstance(aa, dd.DataFrame): - a = aa.compute(scheduler="sync") - else: - a = aa - if isinstance(bb, dd.DataFrame): - b = bb.compute(scheduler="sync") - else: - b = bb +async def list_eq(a, b): + c = get_client() + a = await c.compute(a) if isinstance(a, dd.DataFrame) else a + b = await c.compute(b) if isinstance(b, dd.DataFrame) else b tm.assert_index_equal(a.columns, b.columns) if isinstance(a, pd.DataFrame): @@ -52,6 +47,7 @@ def list_eq(aa, bb): dd._compat.assert_numpy_array_equal(av, bv) +@pytest.mark.skipif(dd._dask_expr_enabled(), reason="pyarrow>=7.0.0 already required") @gen_cluster(client=True) async def test_minimal_version(c, s, a, b): no_pyarrow_ctx = ( @@ -81,30 +77,32 @@ async def test_basic_merge(c, s, a, b, how): B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]}) b = dd.repartition(B, [0, 2, 5]) - joined = hash_join(a, "y", b, "y", how) + joined = a.merge(b, left_on="y", right_on="y", how=how) + + if dd._dask_expr_enabled(): + # Ensure we're using a hash join + from dask_expr._merge import HashJoinP2P + + assert any( + isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk() + ) - assert not hlg_layer_topological(joined.dask, -1).is_materialized() - result = await c.compute(joined) expected = pd.merge(A, B, how, "y") - list_eq(result, expected) + await list_eq(joined, expected) # Different columns and npartitions - joined = hash_join(a, "x", b, "z", "outer", npartitions=3) - assert not hlg_layer_topological(joined.dask, -1).is_materialized() - assert joined.npartitions == 3 + joined = a.merge(b, left_on="x", right_on="z", how="outer") - result = await c.compute(joined) expected = pd.merge(A, B, "outer", None, "x", "z") - - list_eq(result, expected) + await list_eq(joined, expected) assert ( - hash_join(a, "y", b, "y", "inner")._name - == hash_join(a, "y", b, "y", "inner")._name + a.merge(b, left_on="y", right_on="y", how="inner")._name + == a.merge(b, left_on="y", right_on="y", how="inner")._name ) assert ( - hash_join(a, "y", b, "y", "inner")._name - != hash_join(a, "y", b, "y", "outer")._name + a.merge(b, left_on="y", right_on="y", how="inner")._name + != a.merge(b, left_on="y", right_on="y", how="outer")._name ) @@ -188,82 +186,41 @@ async def test_merge(c, s, a, b, how, disk): pd.merge(A, B, left_index=True, right_index=True, how=how), ) joined = dd.merge(a, b, on="y", how=how) - result = await c.compute(joined) - list_eq(result, pd.merge(A, B, on="y", how=how)) + await list_eq(joined, pd.merge(A, B, on="y", how=how)) assert all(d is None for d in joined.divisions) - list_eq( - await c.compute(dd.merge(a, b, left_on="x", right_on="z", how=how)), + await list_eq( + dd.merge(a, b, left_on="x", right_on="z", how=how), pd.merge(A, B, left_on="x", right_on="z", how=how), ) - list_eq( - await c.compute( - dd.merge( - a, - b, - left_on="x", - right_on="z", - how=how, - suffixes=("1", "2"), - ) - ), + await list_eq( + dd.merge(a, b, left_on="x", right_on="z", how=how, suffixes=("1", "2")), pd.merge(A, B, left_on="x", right_on="z", how=how, suffixes=("1", "2")), ) - list_eq( - await c.compute(dd.merge(a, b, how=how)), - pd.merge(A, B, how=how), - ) - list_eq( - await c.compute(dd.merge(a, B, how=how)), - pd.merge(A, B, how=how), - ) - list_eq( - await c.compute(dd.merge(A, b, how=how)), - pd.merge(A, B, how=how), - ) - # Note: No await since A and B are both pandas dataframes and this doesn't - # actually submit anything - list_eq( - c.compute(dd.merge(A, B, how=how)), - pd.merge(A, B, how=how), - ) - - list_eq( - await c.compute(dd.merge(a, b, left_index=True, right_index=True, how=how)), + await list_eq(dd.merge(a, b, how=how), pd.merge(A, B, how=how)) + await list_eq(dd.merge(a, B, how=how), pd.merge(A, B, how=how)) + await list_eq(dd.merge(A, b, how=how), pd.merge(A, B, how=how)) + await list_eq(dd.merge(A, B, how=how), pd.merge(A, B, how=how)) + await list_eq( + dd.merge(a, b, left_index=True, right_index=True, how=how), pd.merge(A, B, left_index=True, right_index=True, how=how), ) - list_eq( - await c.compute( - dd.merge( - a, - b, - left_index=True, - right_index=True, - how=how, - suffixes=("1", "2"), - ) + await list_eq( + dd.merge( + a, b, left_index=True, right_index=True, how=how, suffixes=("1", "2") ), pd.merge( A, B, left_index=True, right_index=True, how=how, suffixes=("1", "2") ), ) - list_eq( - await c.compute(dd.merge(a, b, left_on="x", right_index=True, how=how)), + await list_eq( + dd.merge(a, b, left_on="x", right_index=True, how=how), pd.merge(A, B, left_on="x", right_index=True, how=how), ) - list_eq( - await c.compute( - dd.merge( - a, - b, - left_on="x", - right_index=True, - how=how, - suffixes=("1", "2"), - ) - ), + await list_eq( + dd.merge(a, b, left_on="x", right_index=True, how=how, suffixes=("1", "2")), pd.merge(A, B, left_on="x", right_index=True, how=how, suffixes=("1", "2")), ) @@ -391,65 +348,25 @@ async def test_merge_by_multiple_columns(c, s, a, b, how): ) # hash join - list_eq( - await c.compute( - dd.merge( - ddl, - ddr, - how=how, - left_on="a", - right_on="d", - ) - ), + await list_eq( + dd.merge(ddl, ddr, how=how, left_on="a", right_on="d"), pd.merge(pdl, pdr, how=how, left_on="a", right_on="d"), ) - list_eq( - await c.compute( - dd.merge( - ddl, - ddr, - how=how, - left_on="b", - right_on="e", - ) - ), + await list_eq( + dd.merge(ddl, ddr, how=how, left_on="b", right_on="e"), pd.merge(pdl, pdr, how=how, left_on="b", right_on="e"), ) - - list_eq( - await c.compute( - dd.merge( - ddr, - ddl, - how=how, - left_on="d", - right_on="a", - ) - ), + await list_eq( + dd.merge(ddr, ddl, how=how, left_on="d", right_on="a"), pd.merge(pdr, pdl, how=how, left_on="d", right_on="a"), ) - list_eq( - await c.compute( - dd.merge( - ddr, - ddl, - how=how, - left_on="e", - right_on="b", - ) - ), + await list_eq( + dd.merge(ddr, ddl, how=how, left_on="e", right_on="b"), pd.merge(pdr, pdl, how=how, left_on="e", right_on="b"), ) - - list_eq( - await c.compute( - dd.merge( - ddl, - ddr, - how=how, - left_on=["a", "b"], - right_on=["d", "e"], - ) + await list_eq( + dd.merge( + ddl, ddr, how=how, left_on=["a", "b"], right_on=["d", "e"] ), pd.merge( pdl, pdr, how=how, left_on=["a", "b"], right_on=["d", "e"] diff --git a/distributed/shuffle/tests/test_merge_column_and_index.py b/distributed/shuffle/tests/test_merge_column_and_index.py index b358650205..ead09eebd2 100644 --- a/distributed/shuffle/tests/test_merge_column_and_index.py +++ b/distributed/shuffle/tests/test_merge_column_and_index.py @@ -20,7 +20,6 @@ import dask from dask.dataframe.utils import assert_eq -from distributed.shuffle import HashJoinP2PLayer, P2PShuffleLayer from distributed.utils_test import gen_cluster @@ -178,11 +177,6 @@ async def test_merge_unknown_to_unknown( # Merge unknown to unknown with dask.config.set({"dataframe.shuffle.method": "p2p"}): result_graph = ddf_left_unknown.merge(ddf_right_unknown, on=on, how=how) - if not any( - isinstance(layer, (HashJoinP2PLayer, P2PShuffleLayer)) - for layer in result_graph.dask.layers.values() - ): - pytest.skip("No HashJoin or P2P layer involved") result = await c.compute(result_graph) # Assertions assert_eq(result, expected) diff --git a/distributed/shuffle/tests/test_metrics.py b/distributed/shuffle/tests/test_metrics.py index 965d469390..6956b5f755 100644 --- a/distributed/shuffle/tests/test_metrics.py +++ b/distributed/shuffle/tests/test_metrics.py @@ -8,6 +8,8 @@ from distributed.utils_test import gen_cluster da = pytest.importorskip("dask.array") +dd = pytest.importorskip("dask.dataframe") +from distributed.shuffle.tests.utils import UNPACK_PREFIX def assert_metrics(s: Scheduler, *keys: tuple[str, ...]) -> None: @@ -79,7 +81,7 @@ async def test_dataframe(c, s, a, b): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - shuffled = dd.shuffle.shuffle(df, "x", npartitions=20) + shuffled = df.shuffle("x", npartitions=20) await c.compute(shuffled) await a.heartbeat() await b.heartbeat() @@ -91,10 +93,10 @@ async def test_dataframe(c, s, a, b): ("execute", "shuffle-transfer", "p2p-shards", "bytes"), ("execute", "shuffle-transfer", "p2p-shards", "count"), ("execute", "shuffle-transfer", "p2p-comms-limiter", "count"), - ("execute", "shuffle_p2p", "p2p-disk-read", "bytes"), - ("execute", "shuffle_p2p", "p2p-disk-read", "count"), - ("execute", "shuffle_p2p", "p2p-get-output-cpu", "seconds"), - ("execute", "shuffle_p2p", "p2p-get-output-noncpu", "seconds"), + ("execute", UNPACK_PREFIX, "p2p-disk-read", "bytes"), + ("execute", UNPACK_PREFIX, "p2p-disk-read", "count"), + ("execute", UNPACK_PREFIX, "p2p-get-output-cpu", "seconds"), + ("execute", UNPACK_PREFIX, "p2p-get-output-noncpu", "seconds"), ("p2p", "background-comms", "compress", "seconds"), ("p2p", "background-comms", "idle", "seconds"), ("p2p", "background-comms", "process", "bytes"), diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index ad45a08d2f..51468a4973 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -58,7 +58,7 @@ split_by_worker, ) from distributed.shuffle._worker_plugin import ShuffleWorkerPlugin, _ShuffleRunManager -from distributed.shuffle.tests.utils import AbstractShuffleTestPool +from distributed.shuffle.tests.utils import UNPACK_PREFIX, AbstractShuffleTestPool from distributed.utils import Deadline from distributed.utils_test import ( async_poll_for, @@ -119,6 +119,7 @@ async def check_scheduler_cleanup( assert not plugin.heartbeats +@pytest.mark.skipif(dd._dask_expr_enabled(), reason="pyarrow>=7.0.0 already required") @gen_cluster(client=True) async def test_minimal_version(c, s, a, b): no_pyarrow_ctx = ( @@ -136,7 +137,7 @@ async def test_minimal_version(c, s, a, b): with pytest.raises( ModuleNotFoundError, match="requires pyarrow" ), dask.config.set({"dataframe.shuffle.method": "p2p"}): - await c.compute(dd.shuffle.shuffle(df, "x")) + await c.compute(df.shuffle("x")) @pytest.mark.gpu @@ -162,7 +163,7 @@ async def test_basic_cudf_support(c, s, a, b): freq="10 s", ).to_backend("cudf") with dask.config.set({"dataframe.shuffle.method": "p2p"}): - shuffled = dd.shuffle.shuffle(df, "x") + shuffled = df.shuffle("x") assert shuffled.npartitions == df.npartitions result, expected = await c.compute([shuffled, df], sync=True) @@ -198,7 +199,7 @@ async def test_basic_integration(c, s, a, b, npartitions, disk): with dask.config.set( {"dataframe.shuffle.method": "p2p", "distributed.p2p.disk": disk} ): - shuffled = dd.shuffle.shuffle(df, "x", npartitions=npartitions) + shuffled = df.shuffle("x", npartitions=npartitions) if npartitions is None: assert shuffled.npartitions == df.npartitions else: @@ -228,7 +229,7 @@ async def test_basic_integration_local_cluster(processes): ) c = cluster.get_client() with dask.config.set({"dataframe.shuffle.method": "p2p"}): - out = dd.shuffle.shuffle(df, "x") + out = df.shuffle("x") x, y = c.compute([df, out]) x, y = await c.gather([x, y]) dd.assert_eq(x, y) @@ -244,7 +245,7 @@ async def test_shuffle_with_array_conversion(c, s, a, b, npartitions): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - out = dd.shuffle.shuffle(df, "x", npartitions=npartitions).values + out = df.shuffle("x", npartitions=npartitions).values if npartitions == 1: # FIXME: distributed#7816 @@ -270,7 +271,7 @@ def test_shuffle_before_categorize(loop_in_thread): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - df = dd.shuffle.shuffle(df, "x") + df = df.shuffle("x") df.categorize(columns=["y"]) c.compute(df) @@ -284,8 +285,8 @@ async def test_concurrent(c, s, a, b): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - x = dd.shuffle.shuffle(df, "x") - y = dd.shuffle.shuffle(df, "y") + x = df.shuffle("x") + y = df.shuffle("y") df, x, y = await c.compute([df, x, y], sync=True) dd.assert_eq(x, df, check_index=False) dd.assert_eq(y, df, check_index=False) @@ -304,7 +305,7 @@ async def test_bad_disk(c, s, a, b): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - out = dd.shuffle.shuffle(df, "x") + out = df.shuffle("x") out = out.persist() shuffle_id = await wait_until_new_shuffle_is_initialized(s) while not get_active_shuffle_runs(a): @@ -393,7 +394,7 @@ async def test_closed_worker_during_transfer(c, s, a, b): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - shuffled = dd.shuffle.shuffle(df, "x") + shuffled = df.shuffle("x") fut = c.compute([shuffled, df], sync=True) await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, b) await b.close() @@ -420,7 +421,7 @@ async def test_restarting_during_transfer_raises_killed_worker(c, s, a, b): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - out = dd.shuffle.shuffle(df, "x") + out = df.shuffle("x") out = c.compute(out.x.size) await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, b) await b.close() @@ -463,7 +464,7 @@ async def test_get_or_create_from_dangling_transfer(c, s, a, b): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - out = dd.shuffle.shuffle(df, "x") + out = df.shuffle("x") out = c.compute(out.x.size) shuffle_extA = a.plugins["shuffle"] @@ -503,7 +504,7 @@ async def test_crashed_worker_during_transfer(c, s, a): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - shuffled = dd.shuffle.shuffle(df, "x") + shuffled = df.shuffle("x") fut = c.compute([shuffled, df], sync=True) await wait_until_worker_has_tasks( "shuffle-transfer", killed_worker_address, 1, s @@ -538,7 +539,7 @@ async def test_restarting_does_not_deadlock(c, s): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - out = dd.shuffle.shuffle(df, "x") + out = df.shuffle("x") fut = c.compute(out.x.size) await wait_until_worker_has_tasks( "shuffle-transfer", b.worker_address, 1, s @@ -574,7 +575,7 @@ def mock_get_worker_for_range_sharding( freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - shuffled = dd.shuffle.shuffle(df, "x") + shuffled = df.shuffle("x") fut = c.compute([shuffled, df], sync=True) await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, b, 0.001) await b.close() @@ -609,7 +610,7 @@ def mock_mock_get_worker_for_range_sharding( freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - shuffled = dd.shuffle.shuffle(df, "x") + shuffled = df.shuffle("x") fut = c.compute([shuffled, df], sync=True) await wait_until_worker_has_tasks( "shuffle-transfer", n.worker_address, 1, s @@ -635,7 +636,7 @@ async def test_closed_bystanding_worker_during_shuffle(c, s, w1, w2, w3): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - shuffled = dd.shuffle.shuffle(df, "x") + shuffled = df.shuffle("x") fut = c.compute([shuffled, df], sync=True) await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, w1) await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, w2) @@ -672,7 +673,7 @@ async def test_exception_on_close_cleans_up(c, s, caplog): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - shuffled = dd.shuffle.shuffle(df, "x") + shuffled = df.shuffle("x") await c.compute([shuffled, df], sync=True) assert any("test-exception-on-close" in record.message for record in caplog.records) @@ -704,7 +705,7 @@ async def test_closed_worker_during_barrier(c, s, a, b): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - shuffled = dd.shuffle.shuffle(df, "x") + shuffled = df.shuffle("x") fut = c.compute([shuffled, df], sync=True) shuffle_id = await wait_until_new_shuffle_is_initialized(s) key = barrier_key(shuffle_id) @@ -767,7 +768,7 @@ async def test_restarting_during_barrier_raises_killed_worker(c, s, a, b): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - out = dd.shuffle.shuffle(df, "x") + out = df.shuffle("x") out = c.compute(out.x.size) shuffle_id = await wait_until_new_shuffle_is_initialized(s) key = barrier_key(shuffle_id) @@ -812,7 +813,7 @@ async def test_closed_other_worker_during_barrier(c, s, a, b): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - shuffled = dd.shuffle.shuffle(df, "x") + shuffled = df.shuffle("x") fut = c.compute([shuffled, df], sync=True) shuffle_id = await wait_until_new_shuffle_is_initialized(s) @@ -875,7 +876,7 @@ async def test_crashed_other_worker_during_barrier(c, s, a): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - shuffled = dd.shuffle.shuffle(df, "x") + shuffled = df.shuffle("x") fut = c.compute([shuffled, df], sync=True) shuffle_id = await wait_until_new_shuffle_is_initialized(s) key = barrier_key(shuffle_id) @@ -918,9 +919,9 @@ async def test_closed_worker_during_unpack(c, s, a, b): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - shuffled = dd.shuffle.shuffle(df, "x") + shuffled = df.shuffle("x") fut = c.compute([shuffled, df], sync=True) - await wait_for_tasks_in_state("shuffle_p2p", "memory", 1, b) + await wait_for_tasks_in_state(UNPACK_PREFIX, "memory", 1, b) await b.close() result, expected = await fut @@ -945,9 +946,9 @@ async def test_restarting_during_unpack_raises_killed_worker(c, s, a, b): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - out = dd.shuffle.shuffle(df, "x") + out = df.shuffle("x") out = c.compute(out.x.size) - await wait_for_tasks_in_state("shuffle_p2p", "memory", 1, b) + await wait_for_tasks_in_state(UNPACK_PREFIX, "memory", 1, b) await b.close() with pytest.raises(KilledWorker): @@ -972,10 +973,10 @@ async def test_crashed_worker_during_unpack(c, s, a): ) expected = await c.compute(df) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - shuffled = dd.shuffle.shuffle(df, "x") + shuffled = df.shuffle("x") result = c.compute(shuffled) - await wait_until_worker_has_tasks("shuffle_p2p", killed_worker_address, 1, s) + await wait_until_worker_has_tasks(UNPACK_PREFIX, killed_worker_address, 1, s) await n.process.process.kill() result = await result @@ -997,7 +998,7 @@ async def test_heartbeat(c, s, a, b): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - out = dd.shuffle.shuffle(df, "x") + out = df.shuffle("x") out = out.persist() while not s.plugins["shuffle"].heartbeats: @@ -1213,7 +1214,7 @@ async def test_head(c, s, a, b): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - out = dd.shuffle.shuffle(df, "x") + out = df.shuffle("x") out = await out.head(compute=False).persist() # Only ask for one key assert list(os.walk(a.local_directory)) == a_files # cleaned up files? @@ -1243,7 +1244,7 @@ async def test_clean_after_forgotten_early(c, s, a, b): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - out = dd.shuffle.shuffle(df, "x") + out = df.shuffle("x") out = out.persist() await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, a) await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, b) @@ -1262,7 +1263,7 @@ async def test_tail(c, s, a, b): freq="1 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - x = dd.shuffle.shuffle(df, "x") + x = df.shuffle("x") full = await x.persist() ntasks_full = len(s.tasks) del full @@ -1294,7 +1295,7 @@ async def test_repeat_shuffle_instance(c, s, a, b, wait_until_forgotten): freq="100 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - out = dd.shuffle.shuffle(df, "x").size + out = df.shuffle("x").size await c.compute(out) if wait_until_forgotten: @@ -1325,14 +1326,14 @@ async def test_repeat_shuffle_operation(c, s, a, b, wait_until_forgotten): freq="100 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - await c.compute(dd.shuffle.shuffle(df, "x")) + await c.compute(df.shuffle("x")) if wait_until_forgotten: while s.tasks: await asyncio.sleep(0) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - await c.compute(dd.shuffle.shuffle(df, "x")) + await c.compute(df.shuffle("x")) await check_worker_cleanup(a) await check_worker_cleanup(b) @@ -1344,14 +1345,13 @@ async def test_crashed_worker_after_shuffle(c, s, a): in_event = Event() block_event = Event() - @dask.delayed def block(df, in_event, block_event): in_event.set() block_event.wait() return df async with Nanny(s.address, nthreads=1) as n: - df = df = dask.datasets.timeseries( + df = dask.datasets.timeseries( start="2000-01-01", end="2000-03-01", dtypes={"x": float, "y": float}, @@ -1359,14 +1359,19 @@ def block(df, in_event, block_event): seed=42, ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - out = dd.shuffle.shuffle(df, "x") - in_event = Event() - block_event = Event() - with dask.annotate(workers=[n.worker_address], allow_other_workers=True): - out = block(out, in_event, block_event) + out = df.shuffle("x") + out = c.compute(out) + out = c.submit( + block, + out, + in_event, + block_event, + workers=[n.worker_address], + allow_other_workers=True, + ) - await wait_until_worker_has_tasks("shuffle_p2p", n.worker_address, 1, s) + await wait_until_worker_has_tasks(UNPACK_PREFIX, n.worker_address, 1, s) await in_event.wait() await n.process.process.kill() await block_event.set() @@ -1392,10 +1397,10 @@ async def test_crashed_worker_after_shuffle_persisted(c, s, a): seed=42, ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - out = dd.shuffle.shuffle(df, "x") + out = df.shuffle("x") out = out.persist() - await wait_until_worker_has_tasks("shuffle_p2p", n.worker_address, 1, s) + await wait_until_worker_has_tasks(UNPACK_PREFIX, n.worker_address, 1, s) await out await n.process.process.kill() @@ -1420,7 +1425,7 @@ async def test_closed_worker_between_repeats(c, s, w1, w2, w3): seed=42, ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - out = dd.shuffle.shuffle(df, "x") + out = df.shuffle("x") await c.compute(out.head(compute=False)) await check_worker_cleanup(w1) @@ -1453,7 +1458,7 @@ async def test_new_worker(c, s, a, b): freq="1 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - shuffled = dd.shuffle.shuffle(df, "x") + shuffled = df.shuffle("x") persisted = shuffled.persist() while not s.plugins["shuffle"].active_shuffles: await asyncio.sleep(0.001) @@ -1494,6 +1499,9 @@ async def test_multi(c, s, a, b): await check_scheduler_cleanup(s) +@pytest.mark.skipif( + dd._dask_expr_enabled(), reason="worker restrictions are not supported in dask-expr" +) @gen_cluster(client=True) async def test_restrictions(c, s, a, b): df = dask.datasets.timeseries( @@ -1507,8 +1515,8 @@ async def test_restrictions(c, s, a, b): assert not b.data with dask.config.set({"dataframe.shuffle.method": "p2p"}): - x = dd.shuffle.shuffle(df, "x") - y = dd.shuffle.shuffle(df, "y") + x = df.shuffle("x") + y = df.shuffle("y") x = x.persist(workers=b.address) y = y.persist(workers=a.address) @@ -1529,7 +1537,7 @@ async def test_delete_some_results(c, s, a, b): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - x = dd.shuffle.shuffle(df, "x").persist() + x = df.shuffle("x").persist() while not s.tasks or not any(ts.state == "memory" for ts in s.tasks.values()): await asyncio.sleep(0.01) @@ -1551,7 +1559,7 @@ async def test_add_some_results(c, s, a, b): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - x = dd.shuffle.shuffle(df, "x") + x = df.shuffle("x") y = x.partitions[: x.npartitions // 2].persist() while not s.tasks or not any(ts.state == "memory" for ts in s.tasks.values()): @@ -1579,7 +1587,7 @@ async def test_clean_after_close(c, s, a, b): ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - out = dd.shuffle.shuffle(df, "x") + out = df.shuffle("x") out = out.persist() await wait_for_tasks_in_state("shuffle-transfer", "executing", 1, a) @@ -1934,7 +1942,7 @@ async def test_deduplicate_stale_transfer(c, s, a, b, wait_until_forgotten): freq="100 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - shuffled = dd.shuffle.shuffle(df, "x") + shuffled = df.shuffle("x") shuffled = shuffled.persist() shuffle_extA = a.plugins["shuffle"] @@ -1948,7 +1956,7 @@ async def test_deduplicate_stale_transfer(c, s, a, b, wait_until_forgotten): while s.tasks or get_active_shuffle_runs(a) or get_active_shuffle_runs(b): await asyncio.sleep(0) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - shuffled = dd.shuffle.shuffle(df, "x") + shuffled = df.shuffle("x") result = c.compute(shuffled) await wait_until_new_shuffle_is_initialized(s) shuffle_extA.block_shuffle_receive.set() @@ -1986,7 +1994,7 @@ async def test_handle_stale_barrier(c, s, a, b, wait_until_forgotten): freq="100 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - shuffled = dd.shuffle.shuffle(df, "x") + shuffled = df.shuffle("x") shuffled = shuffled.persist() shuffle_extA = a.plugins["shuffle"] @@ -2006,7 +2014,7 @@ async def test_handle_stale_barrier(c, s, a, b, wait_until_forgotten): await asyncio.sleep(0) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - shuffled = dd.shuffle.shuffle(df, "x") + shuffled = df.shuffle("x") fut = c.compute([shuffled, df], sync=True) await wait_until_new_shuffle_is_initialized(s) shuffle_extA.block_barrier.set() @@ -2045,7 +2053,7 @@ async def test_shuffle_run_consistency(c, s, a): ) # Initialize first shuffle execution with dask.config.set({"dataframe.shuffle.method": "p2p"}): - out = dd.shuffle.shuffle(df, "x") + out = df.shuffle("x") out = out.persist() shuffle_id = await wait_until_new_shuffle_is_initialized(s) @@ -2069,7 +2077,7 @@ async def test_shuffle_run_consistency(c, s, a): # Initialize second shuffle execution with dask.config.set({"dataframe.shuffle.method": "p2p"}): - out = dd.shuffle.shuffle(df, "x") + out = df.shuffle("x") out = out.persist() new_shuffle_id = await wait_until_new_shuffle_is_initialized(s) @@ -2096,7 +2104,7 @@ async def test_shuffle_run_consistency(c, s, a): # Create an unrelated shuffle on a different column with dask.config.set({"dataframe.shuffle.method": "p2p"}): - out = dd.shuffle.shuffle(df, "y") + out = df.shuffle("y") out = out.persist() independent_shuffle_id = await wait_until_new_shuffle_is_initialized(s) assert shuffle_id != independent_shuffle_id @@ -2137,7 +2145,7 @@ async def test_fail_fetch_race(c, s, a): freq="100 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - out = dd.shuffle.shuffle(df, "x") + out = df.shuffle("x") out = out.persist() shuffle_id = await wait_until_new_shuffle_is_initialized(s) @@ -2218,7 +2226,7 @@ async def test_replace_stale_shuffle(c, s, a, b): ) # Initialize first shuffle execution with dask.config.set({"dataframe.shuffle.method": "p2p"}): - out = dd.shuffle.shuffle(df, "x") + out = df.shuffle("x") out = out.persist() shuffle_id = await wait_until_new_shuffle_is_initialized(s) @@ -2244,7 +2252,7 @@ async def test_replace_stale_shuffle(c, s, a, b): # Initialize second shuffle execution with dask.config.set({"dataframe.shuffle.method": "p2p"}): - out = dd.shuffle.shuffle(df, "x") + out = df.shuffle("x") out = out.persist() await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, a) @@ -2299,7 +2307,8 @@ def make_partition(i): return pd.DataFrame({"a": np.random.random(10), "b": [None] * 10}) return pd.DataFrame({"a": np.random.random(10), "b": np.random.random(10)}) - ddf = dd.from_map(make_partition, range(50)) + with dask.config.set({"dataframe.convert-string": False}): + ddf = dd.from_map(make_partition, range(5)) with dask.config.set({"dataframe.shuffle.method": "p2p"}): out = ddf.shuffle(on="a", ignore_index=True) result, expected = c.compute([ddf, out]) @@ -2544,7 +2553,7 @@ async def test_unpack_gets_rescheduled_from_non_participating_worker(c, s, a): # Restrict an unpack task to B so that the previously non-participating # worker takes part in the unpack phase for key in s.tasks: - if key_split(key) == "shuffle_p2p": + if key_split(key) == UNPACK_PREFIX: s.set_restrictions({key: {b.address}}) break @@ -2580,7 +2589,7 @@ async def test_unpack_is_non_rootish(c, s, a, b): await scheduler_plugin.in_barrier.wait() - unpack_tss = [ts for key, ts in s.tasks.items() if key_split(key) == "shuffle_p2p"] + unpack_tss = [ts for key, ts in s.tasks.items() if key_split(key) == UNPACK_PREFIX] assert len(unpack_tss) == 20 assert not any(s.is_rootish(ts) for ts in unpack_tss) del unpack_tss @@ -2624,7 +2633,7 @@ async def test_flaky_connect_fails_without_retry(c, s, a, b): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - x = dd.shuffle.shuffle(df, "x") + x = df.shuffle("x") rpc = await FlakyConnectionPool(failing_connects=1) @@ -2655,7 +2664,7 @@ async def test_flaky_connect_recover_with_retry(c, s, a, b): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - x = dd.shuffle.shuffle(df, "x") + x = df.shuffle("x") rpc = await FlakyConnectionPool(failing_connects=1) with captured_logger("distributed.utils_comm") as caplog: @@ -2696,7 +2705,7 @@ async def test_barrier_handles_stale_resumed_transfer(c, s, *workers): freq="10 s", ) with dask.config.set({"dataframe.shuffle.method": "p2p"}): - out = dd.shuffle.shuffle(df, "x") + out = df.shuffle("x") out = c.compute(out) shuffle_id = await wait_until_new_shuffle_is_initialized(s) key = barrier_key(shuffle_id) diff --git a/distributed/shuffle/tests/utils.py b/distributed/shuffle/tests/utils.py index d1feaebade..7191be5d05 100644 --- a/distributed/shuffle/tests/utils.py +++ b/distributed/shuffle/tests/utils.py @@ -6,6 +6,15 @@ from distributed.core import PooledRPCCall from distributed.shuffle._core import ShuffleId, ShuffleRun +UNPACK_PREFIX = "shuffle_p2p" +try: + import dask.dataframe as dd + + if dd._dask_expr_enabled(): + UNPACK_PREFIX = "p2pshuffle" +except ImportError: + pass + class PooledRPCShuffle(PooledRPCCall): def __init__(self, shuffle: ShuffleRun): diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 58460a8da6..a214bfae84 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -4829,7 +4829,7 @@ def make_err(x): raise ValueError return x - df2 = df.a.map(make_err) + df2 = df.a.map(make_err, meta=df.a) f = c.compute(df2) error_f = await c._get_errored_future(f) function, args, kwargs = await c._get_components_from_future(error_f) @@ -4937,7 +4937,7 @@ async def test_recreate_task_collection(c, s, a, b): df = dd.from_pandas(pd.DataFrame({"a": [0, 1, 2, 3, 4]}), chunksize=2) - df2 = df.a.map(lambda x: x + 1) + df2 = df.a.map(inc, meta=df.a) f = c.compute(df2) function, args, kwargs = await c._get_components_from_future(f) @@ -6559,11 +6559,9 @@ async def test_config_inherited_by_subprocess(): @gen_cluster(client=True) async def test_futures_of_sorted(c, s, a, b): - pytest.importorskip("dask.dataframe") - df = await dask.datasets.timeseries(dtypes={"x": int}).persist() - futures = futures_of(df) - for k, f in zip(df.__dask_keys__(), futures): - assert str(k) in str(f) + b = dask.bag.from_sequence(range(10), npartitions=5).persist() + futures = futures_of(b) + assert [fut.key for fut in futures] == [k for k in b.__dask_keys__()] @gen_cluster( diff --git a/distributed/tests/test_dask_collections.py b/distributed/tests/test_dask_collections.py index 0ed81e849e..979e85d463 100644 --- a/distributed/tests/test_dask_collections.py +++ b/distributed/tests/test_dask_collections.py @@ -61,11 +61,11 @@ async def test_dataframes(c, s, a, b): exprs = [ lambda df: df.x.mean(), lambda df: df.y.std(), - lambda df: df.assign(z=df.x + df.y).drop_duplicates(), + lambda df: df.assign(z=df.x + df.y).drop_duplicates(split_out=1), lambda df: df.index, lambda df: df.x, lambda df: df.x.cumsum(), - lambda df: df.groupby(["x", "y"]).count(), + lambda df: df.groupby(["x", "y"]).count(split_out=1), lambda df: df.loc[50:75], ] for f in exprs: diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 636efcd9e4..652997a2f3 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -2763,13 +2763,13 @@ async def test_get_task_duration(c, s, a, b): @gen_cluster(client=True) async def test_default_task_duration_splits(c, s, a, b): - """Ensure that the default task durations for shuffle split tasks are, by default, + """Ensure that the default task durations for shuffle split tasks are aligned with the task names of dask.dask """ pd = pytest.importorskip("pandas") dd = pytest.importorskip("dask.dataframe") - # We don't care about the actual computation here but we'll schedule one anyhow to + # We don't care about the actual computation here but we'll schedule one anyway to # verify that we're looking for the correct key npart = 10 df = dd.from_pandas(pd.DataFrame({"A": range(100), "B": 1}), npartitions=npart) diff --git a/pyproject.toml b/pyproject.toml index f42887c490..e41227120b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -110,6 +110,8 @@ addopts = ''' -p no:legacypath''' filterwarnings = [ "error", + # https://github.com/dask-contrib/dask-expr/issues/945 + '''ignore:dask_expr does not support the DataFrameIOFunction''', '''ignore:Please use `dok_matrix` from the `scipy\.sparse` namespace, the `scipy\.sparse\.dok` namespace is deprecated.:DeprecationWarning''', '''ignore:elementwise comparison failed. this will raise an error in the future:DeprecationWarning''', '''ignore:unclosed