Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dask/tests/test_distributed.py::test_map_partitions_df_input failing in CI #10455

Closed
jrbourbeau opened this issue Aug 21, 2023 · 3 comments · Fixed by dask/distributed#8125
Closed
Labels
bug Something is broken tests Unit tests and/or continuous integration

Comments

@jrbourbeau
Copy link
Member

It looks like dask/tests/test_distributed.py::test_map_partitions_df_input is failing on main with

_____________________________________________________________________________________________________________________ test_map_partitions_df_input _____________________________________________________________________________________________________________________

>   return get_worker_plugin().add_partition(
        input,
        input_partition,
        spec=DataFrameShuffleSpec(
            id=id, npartitions=npartitions, column=column, parts_out=parts_out
        ),
    )

../distributed/distributed/shuffle/_shuffle.py:61:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../distributed/distributed/shuffle/_worker_plugin.py:139: in add_partition
    shuffle = self.get_or_create_shuffle(spec)
../distributed/distributed/shuffle/_worker_plugin.py:342: in get_or_create_shuffle
    return sync(
../distributed/distributed/utils.py:426: in sync
    raise exc.with_traceback(tb)
../distributed/distributed/utils.py:399: in f
    result = yield future
../../../mambaforge/envs/dask-py39/lib/python3.9/site-packages/tornado/gen.py:767: in run
    value = future.result()
../distributed/distributed/shuffle/_worker_plugin.py:219: in _get_or_create_shuffle
    shuffle = await self._refresh_shuffle(
../distributed/distributed/shuffle/_worker_plugin.py:260: in _refresh_shuffle
    result = await self.worker.scheduler.shuffle_get_or_create(
../distributed/distributed/core.py:1356: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
../distributed/distributed/core.py:1140: in send_recv
    raise exc.with_traceback(tb)
../distributed/distributed/core.py:910: in _handle_comm
    result = handler(**msg)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

>   return self.get(spec.id, worker)
E   AttributeError: 'ToPickle' object has no attribute 'id'

../distributed/distributed/shuffle/_scheduler_plugin.py:123: AttributeError

The above exception was the direct cause of the following exception:

    def test_map_partitions_df_input():
        """
        Check that map_partitions can handle a delayed
        partition of a dataframe input
        """
        pd = pytest.importorskip("pandas")
        dd = pytest.importorskip("dask.dataframe")

        def f(d, a):
            assert isinstance(d, pd.DataFrame)
            assert isinstance(a, pd.DataFrame)
            return d

        def main():
            item_df = dd.from_pandas(pd.DataFrame({"a": range(10)}), npartitions=1)
            ddf = item_df.to_delayed()[0].persist()
            merged_df = dd.from_pandas(pd.DataFrame({"b": range(10)}), npartitions=1)

            # Notice, we include a shuffle in order to trigger a complex culling
            merged_df = merged_df.shuffle(on="b")

            merged_df.map_partitions(
                f, ddf, meta=merged_df, enforce_metadata=False
            ).compute()

        with distributed.LocalCluster(
            scheduler_port=0,
            # Explicitly disabling dashboard to prevent related warnings being
            # elevated to errors until `bokeh=3` is fully supported.
            # See https://github.com/dask/dask/issues/9686 and
            # https://github.com/dask/distributed/issues/7173 for details.
            dashboard_address=":0",
            scheduler_kwargs={"dashboard": False},
            asynchronous=False,
            n_workers=1,
            nthreads=1,
            processes=False,
        ) as cluster:
            with distributed.Client(cluster, asynchronous=False):
>               main()

dask/tests/test_distributed.py:812:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dask/tests/test_distributed.py:794: in main
    merged_df.map_partitions(
dask/base.py:381: in compute
    (result,) = compute(self, traverse=False, **kwargs)
dask/base.py:666: in compute
    results = schedule(dsk, keys, **kwargs)
../distributed/distributed/client.py:3278: in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
../distributed/distributed/client.py:2403: in gather
    return self.sync(
../distributed/distributed/utils.py:359: in sync
    return sync(
../distributed/distributed/utils.py:426: in sync
    raise exc.with_traceback(tb)
../distributed/distributed/utils.py:399: in f
    result = yield future
../../../mambaforge/envs/dask-py39/lib/python3.9/site-packages/tornado/gen.py:767: in run
    value = future.result()
../distributed/distributed/client.py:2265: in _gather
    raise exception.with_traceback(traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

>   raise RuntimeError(f"shuffle_transfer failed during shuffle {id}") from e
E   RuntimeError: shuffle_transfer failed during shuffle 53a3327765b9171a0d39308129460022

../distributed/distributed/shuffle/_shuffle.py:71: RuntimeError
-------------------------------------------------------------------------------------------------------------------------- Captured log call ---------------------------------------------------------------------------------------------------------------------------
INFO     distributed.http.proxy:proxy.py:71 To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
INFO     distributed.scheduler:scheduler.py:1678 State start
INFO     distributed.diskutils:diskutils.py:242 Found stale lock file and directory '/var/folders/h0/_w6tz8jd3b9bk6w7d_xpg9t40000gn/T/dask-scratch-space/worker-kppax9dz', purging
INFO     distributed.diskutils:diskutils.py:242 Found stale lock file and directory '/var/folders/h0/_w6tz8jd3b9bk6w7d_xpg9t40000gn/T/dask-scratch-space/worker-0m1ndgli', purging
INFO     distributed.diskutils:diskutils.py:242 Found stale lock file and directory '/var/folders/h0/_w6tz8jd3b9bk6w7d_xpg9t40000gn/T/dask-scratch-space/worker-30tj5zw3', purging
INFO     distributed.diskutils:diskutils.py:242 Found stale lock file and directory '/var/folders/h0/_w6tz8jd3b9bk6w7d_xpg9t40000gn/T/dask-scratch-space/worker-vgjpr7rh', purging
INFO     distributed.scheduler:scheduler.py:3927   Scheduler at: inproc://192.168.1.173/21440/1
INFO     distributed.scheduler:scheduler.py:3942   dashboard at:  http://192.168.1.173:51640/status
INFO     distributed.worker:worker.py:1442       Start worker at: inproc://192.168.1.173/21440/4
INFO     distributed.worker:worker.py:1443          Listening to:        inproc192.168.1.173
INFO     distributed.worker:worker.py:1446           Worker name:                          0
INFO     distributed.worker:worker.py:1448          dashboard at:        192.168.1.173:51641
INFO     distributed.worker:worker.py:1449 Waiting to connect to: inproc://192.168.1.173/21440/1
INFO     distributed.worker:worker.py:1450 -------------------------------------------------
INFO     distributed.worker:worker.py:1451               Threads:                          8
INFO     distributed.worker:worker.py:1453                Memory:                  16.00 GiB
INFO     distributed.worker:worker.py:1457       Local Directory: /var/folders/h0/_w6tz8jd3b9bk6w7d_xpg9t40000gn/T/dask-scratch-space/worker-n7dya865
INFO     distributed.worker:worker.py:1171 -------------------------------------------------
INFO     distributed.scheduler:scheduler.py:4278 Register worker <WorkerState 'inproc://192.168.1.173/21440/4', name: 0, status: init, memory: 0, processing: 0>
INFO     distributed.scheduler:scheduler.py:5657 Starting worker compute stream, inproc://192.168.1.173/21440/4
INFO     distributed.core:core.py:959 Starting established connection to inproc://192.168.1.173/21440/5
INFO     distributed.worker:worker.py:1885 Starting Worker plugin shuffle
INFO     distributed.worker:worker.py:1238         Registered to: inproc://192.168.1.173/21440/1
INFO     distributed.worker:worker.py:1239 -------------------------------------------------
INFO     distributed.core:core.py:959 Starting established connection to inproc://192.168.1.173/21440/1
INFO     distributed.scheduler:scheduler.py:5415 Receive client connection: Client-3b2d3fd0-406b-11ee-93c0-02471348e20c
INFO     distributed.core:core.py:959 Starting established connection to inproc://192.168.1.173/21440/6
ERROR    distributed.core:core.py:922 Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
  File "/Users/james/projects/dask/distributed/distributed/core.py", line 910, in _handle_comm
    result = handler(**msg)
  File "/Users/james/projects/dask/distributed/distributed/shuffle/_scheduler_plugin.py", line 123, in get_or_create
    return self.get(spec.id, worker)
AttributeError: 'ToPickle' object has no attribute 'id'
WARNING  distributed.worker:worker.py:2357 Compute Failed
Key:       ('shuffle-transfer-53a3327765b9171a0d39308129460022', 0)
Function:  shuffle_transfer
args:      (   b  _partitions
0  0            0
1  1            0
2  2            0
3  3            0
4  4            0
5  5            0
6  6            0
7  7            0
8  8            0
9  9            0, '53a3327765b9171a0d39308129460022', 0, 1, '_partitions', {0})
kwargs:    {}
Exception: "RuntimeError('shuffle_transfer failed during shuffle 53a3327765b9171a0d39308129460022')"

INFO     distributed.scheduler:scheduler.py:5459 Remove client Client-3b2d3fd0-406b-11ee-93c0-02471348e20c
INFO     distributed.core:core.py:984 Received 'close-stream' from inproc://192.168.1.173/21440/6; closing.
INFO     distributed.scheduler:scheduler.py:5459 Remove client Client-3b2d3fd0-406b-11ee-93c0-02471348e20c
INFO     distributed.scheduler:scheduler.py:5451 Close client connection: Client-3b2d3fd0-406b-11ee-93c0-02471348e20c
INFO     distributed.worker:worker.py:1546 Stopping worker at inproc://192.168.1.173/21440/4. Reason: worker-close
INFO     distributed.core:core.py:984 Received 'close-stream' from inproc://192.168.1.173/21440/5; closing.
INFO     distributed.scheduler:scheduler.py:4961 Remove worker <WorkerState 'inproc://192.168.1.173/21440/4', name: 0, status: closing, memory: 0, processing: 0> (stimulus_id='handle-worker-cleanup-1692653979.5407639')
INFO     distributed.scheduler:scheduler.py:5059 Lost all workers
INFO     distributed.core:core.py:969 Connection to inproc://192.168.1.173/21440/1 has been closed.
INFO     distributed.scheduler:scheduler.py:4000 Scheduler closing due to unknown reason...
INFO     distributed.scheduler:scheduler.py:4018 Scheduler closing all comms

cc @hendrikmakait

@jrbourbeau jrbourbeau added bug Something is broken tests Unit tests and/or continuous integration labels Aug 21, 2023
@jrbourbeau
Copy link
Member Author

Looks like dask/distributed#8096 is the change where this test starts failing. cc @wence- in case you have bandwidth to look as well

@hendrikmakait
Copy link
Member

I'm taking a look.

@jrbourbeau
Copy link
Member Author

Thank you @hendrikmakait 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken tests Unit tests and/or continuous integration
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants