Skip to content

Shuffle use pyarrow more broadly #13448

Shuffle use pyarrow more broadly

Shuffle use pyarrow more broadly #13448

GitHub Actions / Unit Test Results failed Mar 21, 2024 in 0s

1 errors, 256 fail, 109 skipped, 3 689 pass in 11h 19m 23s

    29 files  +    1      29 suites  +1   11h 19m 23s ⏱️ + 1h 0m 41s
 4 055 tests +   13   3 689 ✅ +    1    109 💤 + 12  256 ❌ ±0  1 🔥 ±0 
54 889 runs  +9 561  52 142 ✅ +8 835  2 410 💤 +726  336 ❌ ±0  1 🔥 ±0 

Results for commit 9826d68. ± Comparison against earlier commit 8a279f1.

Annotations

Check warning on line 0 in distributed.protocol.tests.test_highlevelgraph

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_combo_of_layer_types (distributed.protocol.tests.test_highlevelgraph)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 1s]
Raw output
RuntimeError: shuffle_barrier failed during shuffle f7bd1538f1208f622fb4f1078c1fd439
>   return get_worker_plugin().barrier(id, run_ids)

distributed/shuffle/_shuffle.py:105: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_worker_plugin.py:395: in barrier
    result = sync(self.worker.loop, self._barrier, shuffle_id, run_ids)
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/shuffle/_worker_plugin.py:359: in _barrier
    shuffle_run = await self.shuffle_runs.get_most_recent(shuffle_id, run_ids)
distributed/shuffle/_worker_plugin.py:178: in get_most_recent
    return await self.get_with_run_id(shuffle_id=shuffle_id, run_id=max(run_ids))
distributed/shuffle/_worker_plugin.py:128: in get_with_run_id
    raise shuffle_run._exception
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:37019', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:44007', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:40885', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_combo_of_layer_types(c, s, a, b):
        """Check pack/unpack of a HLG that has everything!"""
    
        def add(x, y, z, extra_arg):
            return x + y + z + extra_arg
    
        y = c.submit(lambda x: x, 2)
        z = c.submit(lambda x: x, 3)
        xx = await c.submit(lambda x: x + 1, y)
        x = da.blockwise(
            add,
            "x",
            da.zeros((3,), chunks=(1,)),
            "x",
            da.ones((3,), chunks=(1,)),
            "x",
            y,
            None,
            concatenate=False,
            dtype=int,
            extra_arg=z,
        )
    
        df = dd.from_pandas(pd.DataFrame({"a": np.arange(3)}), npartitions=3)
        df = df.shuffle("a")
        df = df["a"].to_dask_array()
    
        res = x.sum() + df.sum()
>       res = await c.compute(res, optimize_graph=False)

distributed/protocol/tests/test_highlevelgraph.py:49: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:336: in _result
    raise exc.with_traceback(tb)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"shuffle_barrier failed during shuffle {id}") from e
E   RuntimeError: shuffle_barrier failed during shuffle f7bd1538f1208f622fb4f1078c1fd439

distributed/shuffle/_shuffle.py:109: RuntimeError

Check warning on line 0 in distributed.protocol.tests.test_highlevelgraph

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_shuffle (distributed.protocol.tests.test_highlevelgraph)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 1s]
Raw output
RuntimeError: P2P shuffling e12e7552a318f750ad0e6fb6b0964768 failed during transfer phase
>   yield

distributed/shuffle/_core.py:494: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
distributed/shuffle/_worker_plugin.py:346: in add_partition
    return shuffle_run.add_partition(
distributed/shuffle/_core.py:344: in add_partition
    sync(self._loop, self._write_to_comm, shards)
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/shuffle/_core.py:255: in _write_to_comm
    await self._comm_buffer.write(data)
distributed/shuffle/_buffer.py:195: in write
    raise self._exception
distributed/shuffle/_buffer.py:113: in process
    await self._process(id, shards)
distributed/utils.py:832: in wrapper
    return await func(*args, **kwargs)
distributed/shuffle/_comms.py:72: in _process
    await self.send(address, shards)
distributed/shuffle/_core.py:226: in send
    return await retry(
distributed/utils_comm.py:424: in retry
    return await coro()
distributed/shuffle/_core.py:204: in _send
    return await self.rpc(address).shuffle_receive(
distributed/core.py:1395: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
distributed/core.py:1179: in send_recv
    raise exc.with_traceback(tb)
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:35409', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:35411', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:34139', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_shuffle(c, s, a, b):
        """Check pack/unpack of a shuffled dataframe"""
    
        df = dd.from_pandas(
            pd.DataFrame(
                {"a": np.arange(10, dtype=int), "b": np.arange(10, 0, -1, dtype=float)}
            ),
            npartitions=5,
        )
        df = df.shuffle("a", max_branch=2)
        df = df["a"] + df["b"]
>       res = await c.compute(df, optimize_graph=False)

distributed/protocol/tests/test_highlevelgraph.py:91: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:336: in _result
    raise exc.with_traceback(tb)
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
../../../miniconda3/envs/dask-distributed/lib/python3.9/contextlib.py:137: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e
E   RuntimeError: P2P shuffling e12e7552a318f750ad0e6fb6b0964768 failed during transfer phase

distributed/shuffle/_core.py:498: RuntimeError

Check warning on line 0 in distributed.protocol.tests.test_highlevelgraph

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_dataframe_annotations (distributed.protocol.tests.test_highlevelgraph)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 1s]
Raw output
RuntimeError: shuffle_barrier failed during shuffle e12e7552a318f750ad0e6fb6b0964768
>   return get_worker_plugin().barrier(id, run_ids)

distributed/shuffle/_shuffle.py:105: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_worker_plugin.py:395: in barrier
    result = sync(self.worker.loop, self._barrier, shuffle_id, run_ids)
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/shuffle/_worker_plugin.py:359: in _barrier
    shuffle_run = await self.shuffle_runs.get_most_recent(shuffle_id, run_ids)
distributed/shuffle/_worker_plugin.py:178: in get_most_recent
    return await self.get_with_run_id(shuffle_id=shuffle_id, run_id=max(run_ids))
distributed/shuffle/_worker_plugin.py:128: in get_with_run_id
    raise shuffle_run._exception
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:314: in shuffle_receive
    shuffle_run = await self._get_shuffle_run(shuffle_id, run_id)
distributed/shuffle/_worker_plugin.py:369: in _get_shuffle_run
    return await self.shuffle_runs.get_with_run_id(
distributed/shuffle/_worker_plugin.py:128: in get_with_run_id
    raise shuffle_run._exception
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:314: in shuffle_receive
    shuffle_run = await self._get_shuffle_run(shuffle_id, run_id)
distributed/shuffle/_worker_plugin.py:369: in _get_shuffle_run
    return await self.shuffle_runs.get_with_run_id(
distributed/shuffle/_worker_plugin.py:128: in get_with_run_id
    raise shuffle_run._exception
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:314: in shuffle_receive
    shuffle_run = await self._get_shuffle_run(shuffle_id, run_id)
distributed/shuffle/_worker_plugin.py:369: in _get_shuffle_run
    return await self.shuffle_runs.get_with_run_id(
distributed/shuffle/_worker_plugin.py:128: in get_with_run_id
    raise shuffle_run._exception
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:314: in shuffle_receive
    shuffle_run = await self._get_shuffle_run(shuffle_id, run_id)
distributed/shuffle/_worker_plugin.py:369: in _get_shuffle_run
    return await self.shuffle_runs.get_with_run_id(
distributed/shuffle/_worker_plugin.py:128: in get_with_run_id
    raise shuffle_run._exception
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:36047', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:41729', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:41011', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_dataframe_annotations(c, s, a, b):
        retries = 5
        plugin = ExampleAnnotationPlugin(retries=retries)
        s.add_plugin(plugin)
    
        assert plugin in s.plugins.values()
    
        df = dd.from_pandas(
            pd.DataFrame(
                {"a": np.arange(10, dtype=int), "b": np.arange(10, 0, -1, dtype=float)}
            ),
            npartitions=5,
        )
        df = df.shuffle("a", max_branch=2)
        acol = df["a"]
        bcol = df["b"]
    
        ctx = contextlib.nullcontext()
        if dd._dask_expr_enabled():
            ctx = pytest.warns(
                UserWarning, match="Annotations will be ignored when using query-planning"
            )
    
        with dask.annotate(retries=retries), ctx:
            df = acol + bcol
    
        with dask.config.set(optimization__fuse__active=False):
>           rdf = await c.compute(df)

distributed/protocol/tests/test_highlevelgraph.py:188: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:336: in _result
    raise exc.with_traceback(tb)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"shuffle_barrier failed during shuffle {id}") from e
E   RuntimeError: shuffle_barrier failed during shuffle e12e7552a318f750ad0e6fb6b0964768

distributed/shuffle/_shuffle.py:109: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_graph

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_basic_state (distributed.shuffle.tests.test_graph)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 2s]
Raw output
RuntimeError: P2P shuffling 0f8ee10eb0dcacc7aa2bd83fd38fd8e8 failed during transfer phase
>   yield

distributed/shuffle/_core.py:494: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
distributed/shuffle/_worker_plugin.py:345: in add_partition
    shuffle_run = self.get_or_create_shuffle(spec)
distributed/shuffle/_worker_plugin.py:415: in get_or_create_shuffle
    return sync(
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/shuffle/_worker_plugin.py:154: in get_or_create
    raise shuffle_run._exception
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:43807', workers: 0, cores: 0, tasks: 0>
workers = (<Worker 'tcp://127.0.0.1:46825', name: 0, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>, <W... 0>, <Worker 'tcp://127.0.0.1:40493', name: 3, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>)
df = Dask DataFrame Structure:
                  name     id        x        y
npartitions=12                              ...         ...    ...      ...      ...
2000-12-26         ...    ...      ...      ...
Dask Name: assign, 4 graph layers
shuffled = Dask DataFrame Structure:
                  name     id        x        y
npartitions=12                              ...  ...      ...      ...
                   ...    ...      ...      ...
Dask Name: drop_by_shallow_copy, 9 graph layers
exts = [<ShuffleWorkerPlugin, worker='tcp://127.0.0.1:46825', closed=True>, <ShuffleWorkerPlugin, worker='tcp://127.0.0.1:408...ugin, worker='tcp://127.0.0.1:37109', closed=True>, <ShuffleWorkerPlugin, worker='tcp://127.0.0.1:40493', closed=True>]

    @gen_cluster([("", 2)] * 4, client=True)
    async def test_basic_state(c, s, *workers):
        df = dd.demo.make_timeseries(freq="15D", partition_freq="30D")
        df["name"] = df["name"].astype("string[python]")
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            shuffled = df.shuffle("id")
    
        exts = [w.extensions["shuffle"] for w in workers]
        for ext in exts:
            assert not ext.shuffle_runs._active_runs
    
        f = c.compute(shuffled)
        # TODO this is a bad/pointless test. the `f.done()` is necessary in case the shuffle is really fast.
        # To test state more thoroughly, we'd need a way to 'stop the world' at various stages. Like have the
        # scheduler pause everything when the barrier is reached. Not sure yet how to implement that.
        while (
            not all(len(ext.shuffle_runs._active_runs) == 1 for ext in exts)
            and not f.done()
        ):
            await asyncio.sleep(0.1)
    
>       await f

distributed/shuffle/tests/test_graph.py:104: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:336: in _result
    raise exc.with_traceback(tb)
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
../../../miniconda3/envs/dask-distributed/lib/python3.9/contextlib.py:137: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e
E   RuntimeError: P2P shuffling 0f8ee10eb0dcacc7aa2bd83fd38fd8e8 failed during transfer phase

distributed/shuffle/_core.py:498: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_graph

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_multiple_linear (distributed.shuffle.tests.test_graph)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 2s]
Raw output
RuntimeError: P2P shuffling 97ff38af178ba531f16e3cf048b76801 failed during transfer phase
>   yield

distributed/shuffle/_core.py:494: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
distributed/shuffle/_worker_plugin.py:345: in add_partition
    shuffle_run = self.get_or_create_shuffle(spec)
distributed/shuffle/_worker_plugin.py:415: in get_or_create_shuffle
    return sync(
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/shuffle/_worker_plugin.py:154: in get_or_create
    raise shuffle_run._exception
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

client = <Client: 'tcp://127.0.0.1:46829' processes=2 threads=2, memory=31.21 GiB>

    def test_multiple_linear(client):
        df = dd.demo.make_timeseries(freq="15D", partition_freq="30D")
        df["name"] = df["name"].astype("string[python]")
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            s1 = df.shuffle("id")
        s1["x"] = s1["x"] + 1
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            s2 = s1.shuffle("x")
    
        with dask.config.set({"dataframe.shuffle.method": "tasks"}):
            expected = df.assign(x=lambda df: df.x + 1).shuffle("x")
        # TODO eventually test for fusion between s1's unpacks, the `+1`, and s2's `transfer`s
    
>       dd.utils.assert_eq(
            s2,
            expected,
            scheduler=client,
        )

distributed/shuffle/tests/test_graph.py:121: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/dask/dataframe/utils.py:582: in assert_eq
    a = _check_dask(
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/dask/dataframe/utils.py:474: in _check_dask
    result = dsk.compute(scheduler=scheduler)
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/dask/base.py:375: in compute
    (result,) = compute(self, traverse=False, **kwargs)
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/dask/base.py:661: in compute
    results = schedule(dsk, keys, **kwargs)
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
../../../miniconda3/envs/dask-distributed/lib/python3.9/contextlib.py:137: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e
E   RuntimeError: P2P shuffling 97ff38af178ba531f16e3cf048b76801 failed during transfer phase

distributed/shuffle/_core.py:498: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_merge_known_to_unknown[idx-inner] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 2s]
Raw output
RuntimeError: P2P shuffling 25b78fd6076303e33b69312a3ef0d5af failed during transfer phase
>   yield

distributed/shuffle/_core.py:494: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
distributed/shuffle/_worker_plugin.py:345: in add_partition
    shuffle_run = self.get_or_create_shuffle(spec)
distributed/shuffle/_worker_plugin.py:415: in get_or_create_shuffle
    return sync(
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/shuffle/_worker_plugin.py:154: in get_or_create
    raise shuffle_run._exception
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:35697', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:39471', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:33877', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...   ...    ...
10                ...    ...
11                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...   ...    ...
                  ...    ...
                  ...    ...
Dask Name: repartition-dataframe, 1 graph layer
on = 'idx', how = 'inner'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:336: in _result
    raise exc.with_traceback(tb)
distributed/shuffle/_merge.py:150: in merge_transfer
    return shuffle_transfer(
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
../../../miniconda3/envs/dask-distributed/lib/python3.9/contextlib.py:137: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e
E   RuntimeError: P2P shuffling 25b78fd6076303e33b69312a3ef0d5af failed during transfer phase

distributed/shuffle/_core.py:498: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_merge_known_to_unknown[idx-left] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 2s]
Raw output
RuntimeError: P2P shuffling 86dcb55f2a9ca18c01478ed870d65482 failed during transfer phase
>   yield

distributed/shuffle/_core.py:494: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
distributed/shuffle/_worker_plugin.py:345: in add_partition
    shuffle_run = self.get_or_create_shuffle(spec)
distributed/shuffle/_worker_plugin.py:415: in get_or_create_shuffle
    return sync(
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/shuffle/_worker_plugin.py:154: in get_or_create
    raise shuffle_run._exception
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:44171', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:44253', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:44717', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...   ...    ...
10                ...    ...
11                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...   ...    ...
                  ...    ...
                  ...    ...
Dask Name: repartition-dataframe, 1 graph layer
on = 'idx', how = 'left'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:336: in _result
    raise exc.with_traceback(tb)
distributed/shuffle/_merge.py:150: in merge_transfer
    return shuffle_transfer(
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
../../../miniconda3/envs/dask-distributed/lib/python3.9/contextlib.py:137: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e
E   RuntimeError: P2P shuffling 86dcb55f2a9ca18c01478ed870d65482 failed during transfer phase

distributed/shuffle/_core.py:498: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_merge_known_to_unknown[idx-right] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 2s]
Raw output
RuntimeError: P2P shuffling 916f88c41a22afedd82cc570d03c2f5f failed during transfer phase
>   yield

distributed/shuffle/_core.py:494: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
distributed/shuffle/_worker_plugin.py:345: in add_partition
    shuffle_run = self.get_or_create_shuffle(spec)
distributed/shuffle/_worker_plugin.py:415: in get_or_create_shuffle
    return sync(
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/shuffle/_worker_plugin.py:154: in get_or_create
    raise shuffle_run._exception
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:37459', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:38987', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:37135', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...   ...    ...
10                ...    ...
11                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...   ...    ...
                  ...    ...
                  ...    ...
Dask Name: repartition-dataframe, 1 graph layer
on = 'idx', how = 'right'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:336: in _result
    raise exc.with_traceback(tb)
distributed/shuffle/_merge.py:150: in merge_transfer
    return shuffle_transfer(
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
../../../miniconda3/envs/dask-distributed/lib/python3.9/contextlib.py:137: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e
E   RuntimeError: P2P shuffling 916f88c41a22afedd82cc570d03c2f5f failed during transfer phase

distributed/shuffle/_core.py:498: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_merge_known_to_unknown[idx-outer] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 2s]
Raw output
RuntimeError: P2P shuffling 68867ec0f9409fcc244a21841be8af2c failed during transfer phase
>   yield

distributed/shuffle/_core.py:494: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
distributed/shuffle/_worker_plugin.py:345: in add_partition
    shuffle_run = self.get_or_create_shuffle(spec)
distributed/shuffle/_worker_plugin.py:415: in get_or_create_shuffle
    return sync(
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/shuffle/_worker_plugin.py:154: in get_or_create
    raise shuffle_run._exception
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:42841', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:42093', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:42073', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...   ...    ...
10                ...    ...
11                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...   ...    ...
                  ...    ...
                  ...    ...
Dask Name: repartition-dataframe, 1 graph layer
on = 'idx', how = 'outer'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:336: in _result
    raise exc.with_traceback(tb)
distributed/shuffle/_merge.py:150: in merge_transfer
    return shuffle_transfer(
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
../../../miniconda3/envs/dask-distributed/lib/python3.9/contextlib.py:137: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e
E   RuntimeError: P2P shuffling 68867ec0f9409fcc244a21841be8af2c failed during transfer phase

distributed/shuffle/_core.py:498: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_merge_known_to_unknown[on1-inner] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 2s]
Raw output
RuntimeError: P2P shuffling c6c9c9560889727dbff3b2fcca0e8bfa failed during transfer phase
>   yield

distributed/shuffle/_core.py:494: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
distributed/shuffle/_worker_plugin.py:345: in add_partition
    shuffle_run = self.get_or_create_shuffle(spec)
distributed/shuffle/_worker_plugin.py:415: in get_or_create_shuffle
    return sync(
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/shuffle/_worker_plugin.py:154: in get_or_create
    raise shuffle_run._exception
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:314: in shuffle_receive
    shuffle_run = await self._get_shuffle_run(shuffle_id, run_id)
distributed/shuffle/_worker_plugin.py:369: in _get_shuffle_run
    return await self.shuffle_runs.get_with_run_id(
distributed/shuffle/_worker_plugin.py:128: in get_with_run_id
    raise shuffle_run._exception
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:314: in shuffle_receive
    shuffle_run = await self._get_shuffle_run(shuffle_id, run_id)
distributed/shuffle/_worker_plugin.py:369: in _get_shuffle_run
    return await self.shuffle_runs.get_with_run_id(
distributed/shuffle/_worker_plugin.py:128: in get_with_run_id
    raise shuffle_run._exception
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:41501', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:41221', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:35815', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...   ...    ...
10                ...    ...
11                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...   ...    ...
                  ...    ...
                  ...    ...
Dask Name: repartition-dataframe, 1 graph layer
on = ['idx'], how = 'inner'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:336: in _result
    raise exc.with_traceback(tb)
distributed/shuffle/_merge.py:150: in merge_transfer
    return shuffle_transfer(
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
../../../miniconda3/envs/dask-distributed/lib/python3.9/contextlib.py:137: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e
E   RuntimeError: P2P shuffling c6c9c9560889727dbff3b2fcca0e8bfa failed during transfer phase

distributed/shuffle/_core.py:498: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_merge_known_to_unknown[on1-left] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 2s]
Raw output
RuntimeError: P2P shuffling 282d1e63f65ba045e948e02c676a704a failed during transfer phase
>   yield

distributed/shuffle/_core.py:494: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
distributed/shuffle/_worker_plugin.py:346: in add_partition
    return shuffle_run.add_partition(
distributed/shuffle/_core.py:344: in add_partition
    sync(self._loop, self._write_to_comm, shards)
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/shuffle/_core.py:255: in _write_to_comm
    await self._comm_buffer.write(data)
distributed/shuffle/_buffer.py:195: in write
    raise self._exception
distributed/shuffle/_buffer.py:113: in process
    await self._process(id, shards)
distributed/utils.py:832: in wrapper
    return await func(*args, **kwargs)
distributed/shuffle/_comms.py:72: in _process
    await self.send(address, shards)
distributed/shuffle/_core.py:226: in send
    return await retry(
distributed/utils_comm.py:424: in retry
    return await coro()
distributed/shuffle/_core.py:204: in _send
    return await self.rpc(address).shuffle_receive(
distributed/core.py:1395: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
distributed/core.py:1179: in send_recv
    raise exc.with_traceback(tb)
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:33619', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:46683', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:45269', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...   ...    ...
10                ...    ...
11                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...   ...    ...
                  ...    ...
                  ...    ...
Dask Name: repartition-dataframe, 1 graph layer
on = ['idx'], how = 'left'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:336: in _result
    raise exc.with_traceback(tb)
distributed/shuffle/_merge.py:150: in merge_transfer
    return shuffle_transfer(
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
../../../miniconda3/envs/dask-distributed/lib/python3.9/contextlib.py:137: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e
E   RuntimeError: P2P shuffling 282d1e63f65ba045e948e02c676a704a failed during transfer phase

distributed/shuffle/_core.py:498: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_merge_known_to_unknown[on1-right] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 2s]
Raw output
RuntimeError: P2P shuffling 285e368dbebdf42a92f6943a124e8adf failed during transfer phase
>   yield

distributed/shuffle/_core.py:494: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
distributed/shuffle/_worker_plugin.py:346: in add_partition
    return shuffle_run.add_partition(
distributed/shuffle/_core.py:344: in add_partition
    sync(self._loop, self._write_to_comm, shards)
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/shuffle/_core.py:255: in _write_to_comm
    await self._comm_buffer.write(data)
distributed/shuffle/_buffer.py:195: in write
    raise self._exception
distributed/shuffle/_buffer.py:113: in process
    await self._process(id, shards)
distributed/utils.py:832: in wrapper
    return await func(*args, **kwargs)
distributed/shuffle/_comms.py:72: in _process
    await self.send(address, shards)
distributed/shuffle/_core.py:226: in send
    return await retry(
distributed/utils_comm.py:424: in retry
    return await coro()
distributed/shuffle/_core.py:204: in _send
    return await self.rpc(address).shuffle_receive(
distributed/core.py:1395: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
distributed/core.py:1179: in send_recv
    raise exc.with_traceback(tb)
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:44587', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:41925', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:40861', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...   ...    ...
10                ...    ...
11                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...   ...    ...
                  ...    ...
                  ...    ...
Dask Name: repartition-dataframe, 1 graph layer
on = ['idx'], how = 'right'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:336: in _result
    raise exc.with_traceback(tb)
distributed/shuffle/_merge.py:150: in merge_transfer
    return shuffle_transfer(
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
../../../miniconda3/envs/dask-distributed/lib/python3.9/contextlib.py:137: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e
E   RuntimeError: P2P shuffling 285e368dbebdf42a92f6943a124e8adf failed during transfer phase

distributed/shuffle/_core.py:498: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_merge_known_to_unknown[on1-outer] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 2s]
Raw output
RuntimeError: P2P shuffling 64bb1dff2def0434ccb1e6f14a0f159a failed during transfer phase
>   yield

distributed/shuffle/_core.py:494: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
distributed/shuffle/_worker_plugin.py:346: in add_partition
    return shuffle_run.add_partition(
distributed/shuffle/_core.py:344: in add_partition
    sync(self._loop, self._write_to_comm, shards)
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/shuffle/_core.py:255: in _write_to_comm
    await self._comm_buffer.write(data)
distributed/shuffle/_buffer.py:195: in write
    raise self._exception
distributed/shuffle/_buffer.py:113: in process
    await self._process(id, shards)
distributed/utils.py:832: in wrapper
    return await func(*args, **kwargs)
distributed/shuffle/_comms.py:72: in _process
    await self.send(address, shards)
distributed/shuffle/_core.py:226: in send
    return await retry(
distributed/utils_comm.py:424: in retry
    return await coro()
distributed/shuffle/_core.py:204: in _send
    return await self.rpc(address).shuffle_receive(
distributed/core.py:1395: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
distributed/core.py:1179: in send_recv
    raise exc.with_traceback(tb)
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:42031', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:41227', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:43663', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...   ...    ...
10                ...    ...
11                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...   ...    ...
                  ...    ...
                  ...    ...
Dask Name: repartition-dataframe, 1 graph layer
on = ['idx'], how = 'outer'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:336: in _result
    raise exc.with_traceback(tb)
distributed/shuffle/_merge.py:150: in merge_transfer
    return shuffle_transfer(
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
../../../miniconda3/envs/dask-distributed/lib/python3.9/contextlib.py:137: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e
E   RuntimeError: P2P shuffling 64bb1dff2def0434ccb1e6f14a0f159a failed during transfer phase

distributed/shuffle/_core.py:498: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_merge_known_to_unknown[on2-inner] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 2s]
Raw output
RuntimeError: P2P shuffling dff12ec99a068fca6e78b4ddabf5d783 failed during transfer phase
>   yield

distributed/shuffle/_core.py:494: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
distributed/shuffle/_worker_plugin.py:346: in add_partition
    return shuffle_run.add_partition(
distributed/shuffle/_core.py:344: in add_partition
    sync(self._loop, self._write_to_comm, shards)
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/shuffle/_core.py:255: in _write_to_comm
    await self._comm_buffer.write(data)
distributed/shuffle/_buffer.py:195: in write
    raise self._exception
distributed/shuffle/_buffer.py:113: in process
    await self._process(id, shards)
distributed/utils.py:832: in wrapper
    return await func(*args, **kwargs)
distributed/shuffle/_comms.py:72: in _process
    await self.send(address, shards)
distributed/shuffle/_core.py:226: in send
    return await retry(
distributed/utils_comm.py:424: in retry
    return await coro()
distributed/shuffle/_core.py:204: in _send
    return await self.rpc(address).shuffle_receive(
distributed/core.py:1395: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
distributed/core.py:1179: in send_recv
    raise exc.with_traceback(tb)
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:35519', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:43053', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:39943', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...   ...    ...
10                ...    ...
11                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...   ...    ...
                  ...    ...
                  ...    ...
Dask Name: repartition-dataframe, 1 graph layer
on = ['idx', 'k'], how = 'inner'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:336: in _result
    raise exc.with_traceback(tb)
distributed/shuffle/_merge.py:150: in merge_transfer
    return shuffle_transfer(
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
../../../miniconda3/envs/dask-distributed/lib/python3.9/contextlib.py:137: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e
E   RuntimeError: P2P shuffling dff12ec99a068fca6e78b4ddabf5d783 failed during transfer phase

distributed/shuffle/_core.py:498: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_merge_known_to_unknown[on2-left] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 2s]
Raw output
RuntimeError: P2P shuffling c2e892523155d27ff6cde48770f416cb failed during transfer phase
>   yield

distributed/shuffle/_core.py:494: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
distributed/shuffle/_worker_plugin.py:346: in add_partition
    return shuffle_run.add_partition(
distributed/shuffle/_core.py:344: in add_partition
    sync(self._loop, self._write_to_comm, shards)
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/shuffle/_core.py:255: in _write_to_comm
    await self._comm_buffer.write(data)
distributed/shuffle/_buffer.py:195: in write
    raise self._exception
distributed/shuffle/_buffer.py:113: in process
    await self._process(id, shards)
distributed/utils.py:832: in wrapper
    return await func(*args, **kwargs)
distributed/shuffle/_comms.py:72: in _process
    await self.send(address, shards)
distributed/shuffle/_core.py:226: in send
    return await retry(
distributed/utils_comm.py:424: in retry
    return await coro()
distributed/shuffle/_core.py:204: in _send
    return await self.rpc(address).shuffle_receive(
distributed/core.py:1395: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
distributed/core.py:1179: in send_recv
    raise exc.with_traceback(tb)
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:33809', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:43463', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:36023', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...   ...    ...
10                ...    ...
11                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...   ...    ...
                  ...    ...
                  ...    ...
Dask Name: repartition-dataframe, 1 graph layer
on = ['idx', 'k'], how = 'left'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:336: in _result
    raise exc.with_traceback(tb)
distributed/shuffle/_merge.py:150: in merge_transfer
    return shuffle_transfer(
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
../../../miniconda3/envs/dask-distributed/lib/python3.9/contextlib.py:137: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e
E   RuntimeError: P2P shuffling c2e892523155d27ff6cde48770f416cb failed during transfer phase

distributed/shuffle/_core.py:498: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_merge_known_to_unknown[on2-right] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 2s]
Raw output
RuntimeError: P2P shuffling 7b8a4ce1d7e977fd78cd26357d5bba48 failed during transfer phase
>   yield

distributed/shuffle/_core.py:494: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
distributed/shuffle/_worker_plugin.py:345: in add_partition
    shuffle_run = self.get_or_create_shuffle(spec)
distributed/shuffle/_worker_plugin.py:415: in get_or_create_shuffle
    return sync(
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/shuffle/_worker_plugin.py:154: in get_or_create
    raise shuffle_run._exception
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:46083', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:35397', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:33361', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...   ...    ...
10                ...    ...
11                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...   ...    ...
                  ...    ...
                  ...    ...
Dask Name: repartition-dataframe, 1 graph layer
on = ['idx', 'k'], how = 'right'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:336: in _result
    raise exc.with_traceback(tb)
distributed/shuffle/_merge.py:150: in merge_transfer
    return shuffle_transfer(
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
../../../miniconda3/envs/dask-distributed/lib/python3.9/contextlib.py:137: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e
E   RuntimeError: P2P shuffling 7b8a4ce1d7e977fd78cd26357d5bba48 failed during transfer phase

distributed/shuffle/_core.py:498: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_merge_known_to_unknown[on2-outer] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 2s]
Raw output
RuntimeError: P2P shuffling f02bbaabec65f298b84e126bc9c5fbef failed during transfer phase
>   yield

distributed/shuffle/_core.py:494: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
distributed/shuffle/_worker_plugin.py:345: in add_partition
    shuffle_run = self.get_or_create_shuffle(spec)
distributed/shuffle/_worker_plugin.py:415: in get_or_create_shuffle
    return sync(
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:314: in shuffle_receive
    shuffle_run = await self._get_shuffle_run(shuffle_id, run_id)
distributed/shuffle/_worker_plugin.py:369: in _get_shuffle_run
    return await self.shuffle_runs.get_with_run_id(
distributed/shuffle/_worker_plugin.py:128: in get_with_run_id
    raise shuffle_run._exception
distributed/shuffle/_worker_plugin.py:154: in get_or_create
    raise shuffle_run._exception
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:37543', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:45771', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:46243', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...   ...    ...
10                ...    ...
11                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...   ...    ...
                  ...    ...
                  ...    ...
Dask Name: repartition-dataframe, 1 graph layer
on = ['idx', 'k'], how = 'outer'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:336: in _result
    raise exc.with_traceback(tb)
distributed/shuffle/_merge.py:150: in merge_transfer
    return shuffle_transfer(
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
../../../miniconda3/envs/dask-distributed/lib/python3.9/contextlib.py:137: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e
E   RuntimeError: P2P shuffling f02bbaabec65f298b84e126bc9c5fbef failed during transfer phase

distributed/shuffle/_core.py:498: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_merge_known_to_unknown[on3-inner] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 2s]
Raw output
RuntimeError: P2P shuffling 45ab3f4d20418016ca6f56f20f0e97eb failed during transfer phase
>   yield

distributed/shuffle/_core.py:494: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
distributed/shuffle/_worker_plugin.py:345: in add_partition
    shuffle_run = self.get_or_create_shuffle(spec)
distributed/shuffle/_worker_plugin.py:415: in get_or_create_shuffle
    return sync(
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/shuffle/_worker_plugin.py:154: in get_or_create
    raise shuffle_run._exception
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:33561', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:45575', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:42447', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...   ...    ...
10                ...    ...
11                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...   ...    ...
                  ...    ...
                  ...    ...
Dask Name: repartition-dataframe, 1 graph layer
on = ['k', 'idx'], how = 'inner'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:336: in _result
    raise exc.with_traceback(tb)
distributed/shuffle/_merge.py:150: in merge_transfer
    return shuffle_transfer(
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
../../../miniconda3/envs/dask-distributed/lib/python3.9/contextlib.py:137: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e
E   RuntimeError: P2P shuffling 45ab3f4d20418016ca6f56f20f0e97eb failed during transfer phase

distributed/shuffle/_core.py:498: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_merge_known_to_unknown[on3-left] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 2s]
Raw output
RuntimeError: P2P shuffling c74adfab29b69ba1f8a9c212cfd01b90 failed during transfer phase
>   yield

distributed/shuffle/_core.py:494: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
distributed/shuffle/_worker_plugin.py:346: in add_partition
    return shuffle_run.add_partition(
distributed/shuffle/_core.py:344: in add_partition
    sync(self._loop, self._write_to_comm, shards)
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/shuffle/_core.py:255: in _write_to_comm
    await self._comm_buffer.write(data)
distributed/shuffle/_buffer.py:195: in write
    raise self._exception
distributed/shuffle/_buffer.py:113: in process
    await self._process(id, shards)
distributed/utils.py:832: in wrapper
    return await func(*args, **kwargs)
distributed/shuffle/_comms.py:72: in _process
    await self.send(address, shards)
distributed/shuffle/_core.py:226: in send
    return await retry(
distributed/utils_comm.py:424: in retry
    return await coro()
distributed/shuffle/_core.py:204: in _send
    return await self.rpc(address).shuffle_receive(
distributed/core.py:1395: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
distributed/core.py:1179: in send_recv
    raise exc.with_traceback(tb)
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:42789', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:45863', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:43039', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...   ...    ...
10                ...    ...
11                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...   ...    ...
                  ...    ...
                  ...    ...
Dask Name: repartition-dataframe, 1 graph layer
on = ['k', 'idx'], how = 'left'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:336: in _result
    raise exc.with_traceback(tb)
distributed/shuffle/_merge.py:150: in merge_transfer
    return shuffle_transfer(
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
../../../miniconda3/envs/dask-distributed/lib/python3.9/contextlib.py:137: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e
E   RuntimeError: P2P shuffling c74adfab29b69ba1f8a9c212cfd01b90 failed during transfer phase

distributed/shuffle/_core.py:498: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_merge_known_to_unknown[on3-right] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 2s]
Raw output
RuntimeError: P2P shuffling 609265113f8c8fc98c8a6ed03f018523 failed during transfer phase
>   yield

distributed/shuffle/_core.py:494: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
distributed/shuffle/_worker_plugin.py:346: in add_partition
    return shuffle_run.add_partition(
distributed/shuffle/_core.py:344: in add_partition
    sync(self._loop, self._write_to_comm, shards)
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/shuffle/_core.py:255: in _write_to_comm
    await self._comm_buffer.write(data)
distributed/shuffle/_buffer.py:195: in write
    raise self._exception
distributed/shuffle/_buffer.py:113: in process
    await self._process(id, shards)
distributed/utils.py:832: in wrapper
    return await func(*args, **kwargs)
distributed/shuffle/_comms.py:72: in _process
    await self.send(address, shards)
distributed/shuffle/_core.py:226: in send
    return await retry(
distributed/utils_comm.py:424: in retry
    return await coro()
distributed/shuffle/_core.py:204: in _send
    return await self.rpc(address).shuffle_receive(
distributed/core.py:1395: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
distributed/core.py:1179: in send_recv
    raise exc.with_traceback(tb)
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:35993', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:36587', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:33905', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...   ...    ...
10                ...    ...
11                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...   ...    ...
                  ...    ...
                  ...    ...
Dask Name: repartition-dataframe, 1 graph layer
on = ['k', 'idx'], how = 'right'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:336: in _result
    raise exc.with_traceback(tb)
distributed/shuffle/_merge.py:150: in merge_transfer
    return shuffle_transfer(
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
../../../miniconda3/envs/dask-distributed/lib/python3.9/contextlib.py:137: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e
E   RuntimeError: P2P shuffling 609265113f8c8fc98c8a6ed03f018523 failed during transfer phase

distributed/shuffle/_core.py:498: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_merge_known_to_unknown[on3-outer] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 2s]
Raw output
RuntimeError: P2P shuffling d68ecfc2cf4748dbc406168911e0ea66 failed during transfer phase
>   yield

distributed/shuffle/_core.py:494: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
distributed/shuffle/_worker_plugin.py:345: in add_partition
    shuffle_run = self.get_or_create_shuffle(spec)
distributed/shuffle/_worker_plugin.py:415: in get_or_create_shuffle
    return sync(
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/shuffle/_worker_plugin.py:154: in get_or_create
    raise shuffle_run._exception
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:41949', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:35439', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:36633', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...   ...    ...
10                ...    ...
11                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
ddf_right_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...   ...    ...
                  ...    ...
                  ...    ...
Dask Name: repartition-dataframe, 1 graph layer
on = ['k', 'idx'], how = 'outer'

    @gen_cluster(client=True)
    async def test_merge_known_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how)
>       result = await c.compute(result_graph)

distributed/shuffle/tests/test_merge_column_and_index.py:129: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:336: in _result
    raise exc.with_traceback(tb)
distributed/shuffle/_merge.py:150: in merge_transfer
    return shuffle_transfer(
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
../../../miniconda3/envs/dask-distributed/lib/python3.9/contextlib.py:137: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e
E   RuntimeError: P2P shuffling d68ecfc2cf4748dbc406168911e0ea66 failed during transfer phase

distributed/shuffle/_core.py:498: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_merge_unknown_to_known[idx-inner] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 2s]
Raw output
RuntimeError: P2P shuffling 25b78fd6076303e33b69312a3ef0d5af failed during transfer phase
>   yield

distributed/shuffle/_core.py:494: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
distributed/shuffle/_worker_plugin.py:346: in add_partition
    return shuffle_run.add_partition(
distributed/shuffle/_core.py:344: in add_partition
    sync(self._loop, self._write_to_comm, shards)
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/shuffle/_core.py:255: in _write_to_comm
    await self._comm_buffer.write(data)
distributed/shuffle/_buffer.py:195: in write
    raise self._exception
distributed/shuffle/_buffer.py:113: in process
    await self._process(id, shards)
distributed/utils.py:832: in wrapper
    return await func(*args, **kwargs)
distributed/shuffle/_comms.py:72: in _process
    await self.send(address, shards)
distributed/shuffle/_core.py:226: in send
    return await retry(
distributed/utils_comm.py:424: in retry
    return await coro()
distributed/shuffle/_core.py:204: in _send
    return await self.rpc(address).shuffle_receive(
distributed/core.py:1395: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
distributed/core.py:1179: in send_recv
    raise exc.with_traceback(tb)
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:37835', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:35969', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:36723', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...   ...    ...
                  ...    ...
                  ...    ...
Dask Name: repartition-dataframe, 1 graph layer
ddf_right = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...   ...    ...
10                ...    ...
11                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
on = 'idx', how = 'inner'

    @gen_cluster(client=True)
    async def test_merge_unknown_to_known(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left_unknown,
        ddf_right,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left_unknown.merge(ddf_right, on=on, how=how)
>       result = await c.compute(result_graph)

distributed/shuffle/tests/test_merge_column_and_index.py:154: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:336: in _result
    raise exc.with_traceback(tb)
distributed/shuffle/_merge.py:150: in merge_transfer
    return shuffle_transfer(
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
../../../miniconda3/envs/dask-distributed/lib/python3.9/contextlib.py:137: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e
E   RuntimeError: P2P shuffling 25b78fd6076303e33b69312a3ef0d5af failed during transfer phase

distributed/shuffle/_core.py:498: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_merge_unknown_to_known[idx-left] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 2s]
Raw output
RuntimeError: P2P shuffling 86dcb55f2a9ca18c01478ed870d65482 failed during transfer phase
>   yield

distributed/shuffle/_core.py:494: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
distributed/shuffle/_worker_plugin.py:346: in add_partition
    return shuffle_run.add_partition(
distributed/shuffle/_core.py:344: in add_partition
    sync(self._loop, self._write_to_comm, shards)
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/shuffle/_core.py:255: in _write_to_comm
    await self._comm_buffer.write(data)
distributed/shuffle/_buffer.py:195: in write
    raise self._exception
distributed/shuffle/_buffer.py:113: in process
    await self._process(id, shards)
distributed/utils.py:832: in wrapper
    return await func(*args, **kwargs)
distributed/shuffle/_comms.py:72: in _process
    await self.send(address, shards)
distributed/shuffle/_core.py:226: in send
    return await retry(
distributed/utils_comm.py:424: in retry
    return await coro()
distributed/shuffle/_core.py:204: in _send
    return await self.rpc(address).shuffle_receive(
distributed/core.py:1395: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
distributed/core.py:1179: in send_recv
    raise exc.with_traceback(tb)
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:46617', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:38301', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:38343', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...   ...    ...
                  ...    ...
                  ...    ...
Dask Name: repartition-dataframe, 1 graph layer
ddf_right = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...   ...    ...
10                ...    ...
11                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
on = 'idx', how = 'left'

    @gen_cluster(client=True)
    async def test_merge_unknown_to_known(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left_unknown,
        ddf_right,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left_unknown.merge(ddf_right, on=on, how=how)
>       result = await c.compute(result_graph)

distributed/shuffle/tests/test_merge_column_and_index.py:154: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:336: in _result
    raise exc.with_traceback(tb)
distributed/shuffle/_merge.py:150: in merge_transfer
    return shuffle_transfer(
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
../../../miniconda3/envs/dask-distributed/lib/python3.9/contextlib.py:137: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e
E   RuntimeError: P2P shuffling 86dcb55f2a9ca18c01478ed870d65482 failed during transfer phase

distributed/shuffle/_core.py:498: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_merge_unknown_to_known[idx-right] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 2s]
Raw output
RuntimeError: P2P shuffling 916f88c41a22afedd82cc570d03c2f5f failed during transfer phase
>   yield

distributed/shuffle/_core.py:494: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
distributed/shuffle/_worker_plugin.py:346: in add_partition
    return shuffle_run.add_partition(
distributed/shuffle/_core.py:344: in add_partition
    sync(self._loop, self._write_to_comm, shards)
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/shuffle/_core.py:255: in _write_to_comm
    await self._comm_buffer.write(data)
distributed/shuffle/_buffer.py:195: in write
    raise self._exception
distributed/shuffle/_buffer.py:113: in process
    await self._process(id, shards)
distributed/utils.py:832: in wrapper
    return await func(*args, **kwargs)
distributed/shuffle/_comms.py:72: in _process
    await self.send(address, shards)
distributed/shuffle/_core.py:226: in send
    return await retry(
distributed/utils_comm.py:424: in retry
    return await coro()
distributed/shuffle/_core.py:204: in _send
    return await self.rpc(address).shuffle_receive(
distributed/core.py:1395: in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
distributed/core.py:1179: in send_recv
    raise exc.with_traceback(tb)
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:314: in shuffle_receive
    shuffle_run = await self._get_shuffle_run(shuffle_id, run_id)
distributed/shuffle/_worker_plugin.py:369: in _get_shuffle_run
    return await self.shuffle_runs.get_with_run_id(
distributed/shuffle/_worker_plugin.py:128: in get_with_run_id
    raise shuffle_run._exception
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:38821', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:33741', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:41275', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...   ...    ...
                  ...    ...
                  ...    ...
Dask Name: repartition-dataframe, 1 graph layer
ddf_right = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...   ...    ...
10                ...    ...
11                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
on = 'idx', how = 'right'

    @gen_cluster(client=True)
    async def test_merge_unknown_to_known(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left_unknown,
        ddf_right,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left_unknown.merge(ddf_right, on=on, how=how)
>       result = await c.compute(result_graph)

distributed/shuffle/tests/test_merge_column_and_index.py:154: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:336: in _result
    raise exc.with_traceback(tb)
distributed/shuffle/_merge.py:150: in merge_transfer
    return shuffle_transfer(
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
../../../miniconda3/envs/dask-distributed/lib/python3.9/contextlib.py:137: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e
E   RuntimeError: P2P shuffling 916f88c41a22afedd82cc570d03c2f5f failed during transfer phase

distributed/shuffle/_core.py:498: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 12 runs failed: test_merge_unknown_to_known[idx-outer] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/ubuntu-latest-mindeps-pandas-notci1/pytest.xml [took 2s]
Raw output
RuntimeError: P2P shuffling 68867ec0f9409fcc244a21841be8af2c failed during transfer phase
>   yield

distributed/shuffle/_core.py:494: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
distributed/shuffle/_worker_plugin.py:345: in add_partition
    shuffle_run = self.get_or_create_shuffle(spec)
distributed/shuffle/_worker_plugin.py:415: in get_or_create_shuffle
    return sync(
distributed/utils.py:434: in sync
    raise error
distributed/utils.py:408: in f
    result = yield future
../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:735: in run
    value = future.result()
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:314: in shuffle_receive
    shuffle_run = await self._get_shuffle_run(shuffle_id, run_id)
distributed/shuffle/_worker_plugin.py:369: in _get_shuffle_run
    return await self.shuffle_runs.get_with_run_id(
distributed/shuffle/_worker_plugin.py:128: in get_with_run_id
    raise shuffle_run._exception
distributed/shuffle/_worker_plugin.py:154: in get_or_create
    raise shuffle_run._exception
distributed/core.py:970: in _handle_comm
    result = await result
distributed/shuffle/_worker_plugin.py:315: in shuffle_receive
    await shuffle_run.receive(data)
distributed/shuffle/_core.py:309: in receive
    await self._receive(data)
distributed/shuffle/_shuffle.py:499: in _receive
    groups = await self.offload(self._repartition_buffers, filtered)
distributed/shuffle/_core.py:238: in offload
    return await run_in_executor_with_context(
distributed/utils.py:1540: in run_in_executor_with_context
    return await loop.run_in_executor(
../../../miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
distributed/utils.py:1541: in <lambda>
    executor, lambda: context.run(func, *args, **kwargs)
distributed/shuffle/_shuffle.py:510: in _repartition_buffers
    groups = split_by_partition(table, self.column, self.drop_column)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   partitions = np.array(pa.compute.unique(t[column]).sort())
E   AttributeError: 'pyarrow.lib.Int8Array' object has no attribute 'sort'

distributed/shuffle/_shuffle.py:372: AttributeError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:33725', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:42435', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:41803', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left_unknown = Dask DataFrame Structure:
                    k     v1
npartitions=10              
                int64  int64
     ...   ...    ...
                  ...    ...
                  ...    ...
Dask Name: repartition-dataframe, 1 graph layer
ddf_right = Dask DataFrame Structure:
                    k     v1
npartitions=10              
0               int64  int64
1    ...   ...    ...
10                ...    ...
11                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
on = 'idx', how = 'outer'

    @gen_cluster(client=True)
    async def test_merge_unknown_to_known(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left_unknown,
        ddf_right,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Perform merge
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left_unknown.merge(ddf_right, on=on, how=how)
>       result = await c.compute(result_graph)

distributed/shuffle/tests/test_merge_column_and_index.py:154: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:336: in _result
    raise exc.with_traceback(tb)
distributed/shuffle/_merge.py:150: in merge_transfer
    return shuffle_transfer(
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
../../../miniconda3/envs/dask-distributed/lib/python3.9/contextlib.py:137: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e
E   RuntimeError: P2P shuffling 68867ec0f9409fcc244a21841be8af2c failed during transfer phase

distributed/shuffle/_core.py:498: RuntimeError