Skip to content
GitHub Actions / Unit Test Results failed May 3, 2024 in 0s

1 errors, 779 fail, 130 skipped, 1 029 pass in 14h 59m 31s

    18 files   -      9     18 suites   - 9   14h 59m 31s ⏱️ + 4h 48m 8s
 1 939 tests  -  2 112  1 029 ✅  -  2 923  130 💤 +   33    779 ❌ +  777  1 🔥 +1 
16 066 runs   - 35 721  8 013 ✅  - 41 757  788 💤  - 1 227  7 256 ❌ +7 254  9 🔥 +9 

Results for commit a990563. ± Comparison against earlier commit 458eb98.

Annotations

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_basic_merge[inner] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-e631d8eccef279025e42dc005ac2ec27
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:33055', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'inner'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_basic_merge(c, s, a, b, how):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        joined = a.merge(b, left_on="y", right_on="y", how=how)
    
        if dd._dask_expr_enabled():
            # Ensure we're using a hash join
            from dask_expr._merge import HashJoinP2P
    
            assert any(
                isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk()
            )
    
        expected = pd.merge(A, B, how, "y")
>       await list_eq(joined, expected)

distributed/shuffle/tests/test_merge.py:91: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/tests/test_merge.py:36: in list_eq
    a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-e631d8eccef279025e42dc005ac2ec27>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-e631d8eccef279025e42dc005ac2ec27

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_basic_merge[left] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-dd447fb22719f3743c3b573b730bd7f3
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:46629', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'left'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_basic_merge(c, s, a, b, how):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        joined = a.merge(b, left_on="y", right_on="y", how=how)
    
        if dd._dask_expr_enabled():
            # Ensure we're using a hash join
            from dask_expr._merge import HashJoinP2P
    
            assert any(
                isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk()
            )
    
        expected = pd.merge(A, B, how, "y")
>       await list_eq(joined, expected)

distributed/shuffle/tests/test_merge.py:91: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/tests/test_merge.py:36: in list_eq
    a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-dd447fb22719f3743c3b573b730bd7f3>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-dd447fb22719f3743c3b573b730bd7f3

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_basic_merge[right] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-9627eb8818358f60662639d9ff830989
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:35525', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'right'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_basic_merge(c, s, a, b, how):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        joined = a.merge(b, left_on="y", right_on="y", how=how)
    
        if dd._dask_expr_enabled():
            # Ensure we're using a hash join
            from dask_expr._merge import HashJoinP2P
    
            assert any(
                isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk()
            )
    
        expected = pd.merge(A, B, how, "y")
>       await list_eq(joined, expected)

distributed/shuffle/tests/test_merge.py:91: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/tests/test_merge.py:36: in list_eq
    a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-9627eb8818358f60662639d9ff830989>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-9627eb8818358f60662639d9ff830989

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_basic_merge[outer] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-30c90adae378a16295147c29fbcc84b2
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:43845', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'outer'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_basic_merge(c, s, a, b, how):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        joined = a.merge(b, left_on="y", right_on="y", how=how)
    
        if dd._dask_expr_enabled():
            # Ensure we're using a hash join
            from dask_expr._merge import HashJoinP2P
    
            assert any(
                isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk()
            )
    
        expected = pd.merge(A, B, how, "y")
>       await list_eq(joined, expected)

distributed/shuffle/tests/test_merge.py:91: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/tests/test_merge.py:36: in list_eq
    a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-30c90adae378a16295147c29fbcc84b2>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-30c90adae378a16295147c29fbcc84b2

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge_p2p_shuffle_reused_dataframe_with_different_parameters (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-87e18f46c7bf5aab9cc44adadf08aab4
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:37149', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:40069', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:43173', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_merge_p2p_shuffle_reused_dataframe_with_different_parameters(c, s, a, b):
        pdf1 = pd.DataFrame({"a": range(100), "b": range(0, 200, 2)})
        pdf2 = pd.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50})
        ddf1 = dd.from_pandas(pdf1, npartitions=5)
        ddf2 = dd.from_pandas(pdf2, npartitions=10)
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            out = (
                ddf1.merge(ddf2, left_on="a", right_on="x")
                # Vary the number of output partitions for the shuffles of dd2
                .repartition(npartitions=20).merge(ddf2, left_on="b", right_on="x")
            )
        # Generate unique shuffle IDs if the input frame is the same but
        # parameters differ. Reusing shuffles in merges is dangerous because of the
        # required coordination and complexity introduced through dynamic clusters.
        assert sum(id_from_key(k) is not None for k in out.dask) == 4
>       result = await c.compute(out)

distributed/shuffle/tests/test_merge.py:126: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-87e18f46c7bf5aab9cc44adadf08aab4>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-87e18f46c7bf5aab9cc44adadf08aab4

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge_p2p_shuffle_reused_dataframe_with_same_parameters (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-39b72a493975779752b4b99d408b24df
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:34933', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:43809', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:42905', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_merge_p2p_shuffle_reused_dataframe_with_same_parameters(c, s, a, b):
        pdf1 = pd.DataFrame({"a": range(100), "b": range(0, 200, 2)})
        pdf2 = pd.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50})
        ddf1 = dd.from_pandas(pdf1, npartitions=5)
        ddf2 = dd.from_pandas(pdf2, npartitions=10)
    
        # This performs two shuffles:
        #   * ddf1 is shuffled on `a`
        #   * ddf2 is shuffled on `x`
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            ddf3 = ddf1.merge(
                ddf2,
                left_on="a",
                right_on="x",
            )
    
        # This performs one shuffle:
        #   * ddf3 is shuffled on `b`
        # We can reuse the shuffle of dd2 on `x` from the previous merge.
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            out = ddf2.merge(
                ddf3,
                left_on="x",
                right_on="b",
            )
        # Generate unique shuffle IDs if the input frame is the same and all its
        # parameters match. Reusing shuffles in merges is dangerous because of the
        # required coordination and complexity introduced through dynamic clusters.
        assert sum(id_from_key(k) is not None for k in out.dask) == 4
>       result = await c.compute(out)

distributed/shuffle/tests/test_merge.py:163: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-39b72a493975779752b4b99d408b24df>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-39b72a493975779752b4b99d408b24df

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge[True-inner] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-43dfb617454cb1aa42c8a3ccdcb16329
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:40515', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'inner', disk = True

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
>           res = await c.compute(joined)

distributed/shuffle/tests/test_merge.py:183: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-43dfb617454cb1aa42c8a3ccdcb16329>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-43dfb617454cb1aa42c8a3ccdcb16329

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge[True-outer] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-af93dbc77b27048ec91353f09450d571
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:43597', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'outer', disk = True

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
>           res = await c.compute(joined)

distributed/shuffle/tests/test_merge.py:183: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-af93dbc77b27048ec91353f09450d571>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-af93dbc77b27048ec91353f09450d571

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge[True-left] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-10a1f67bc4de3d4535f8be22ca0a4a75
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:33601', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'left', disk = True

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
>           res = await c.compute(joined)

distributed/shuffle/tests/test_merge.py:183: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-10a1f67bc4de3d4535f8be22ca0a4a75>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-10a1f67bc4de3d4535f8be22ca0a4a75

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge[True-right] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-6901874377222dc3d690d647fc7da03d
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:40417', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'right', disk = True

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
>           res = await c.compute(joined)

distributed/shuffle/tests/test_merge.py:183: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-6901874377222dc3d690d647fc7da03d>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-6901874377222dc3d690d647fc7da03d

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge[False-inner] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-43dfb617454cb1aa42c8a3ccdcb16329
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:43005', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'inner', disk = False

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
>           res = await c.compute(joined)

distributed/shuffle/tests/test_merge.py:183: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-43dfb617454cb1aa42c8a3ccdcb16329>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-43dfb617454cb1aa42c8a3ccdcb16329

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge[False-outer] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-af93dbc77b27048ec91353f09450d571
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:38005', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'outer', disk = False

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
>           res = await c.compute(joined)

distributed/shuffle/tests/test_merge.py:183: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-af93dbc77b27048ec91353f09450d571>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-af93dbc77b27048ec91353f09450d571

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge[False-left] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-10a1f67bc4de3d4535f8be22ca0a4a75
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:42753', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'left', disk = False

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
>           res = await c.compute(joined)

distributed/shuffle/tests/test_merge.py:183: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-10a1f67bc4de3d4535f8be22ca0a4a75>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-10a1f67bc4de3d4535f8be22ca0a4a75

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge[False-right] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-6901874377222dc3d690d647fc7da03d
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:42717', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'right', disk = False

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
>           res = await c.compute(joined)

distributed/shuffle/tests/test_merge.py:183: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-6901874377222dc3d690d647fc7da03d>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-6901874377222dc3d690d647fc7da03d

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge_by_multiple_columns[inner] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-2920fb1e84930799e8a936f9714a6850
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:45825', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:35315', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:35547', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'inner'

    @pytest.mark.slow
    @gen_cluster(client=True, timeout=120)
    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    async def test_merge_by_multiple_columns(c, s, a, b, how):
        # warnings here from pandas
        pdf1l = pd.DataFrame(
            {
                "a": list("abcdefghij"),
                "b": list("abcdefghij"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf1r = pd.DataFrame(
            {
                "d": list("abcdefghij"),
                "e": list("abcdefghij"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("abcdefghij"),
        )
    
        pdf2l = pd.DataFrame(
            {
                "a": list("abcdeabcde"),
                "b": list("abcabcabca"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf2r = pd.DataFrame(
            {
                "d": list("edcbaedcba"),
                "e": list("aaabbbcccd"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("fghijklmno"),
        )
    
        pdf3l = pd.DataFrame(
            {
                "a": list("aaaaaaaaaa"),
                "b": list("aaaaaaaaaa"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf3r = pd.DataFrame(
            {
                "d": list("aaabbbccaa"),
                "e": list("abbbbbbbbb"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("ABCDEFGHIJ"),
        )
    
        for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
            for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
                ddl = dd.from_pandas(pdl, lpart)
                ddr = dd.from_pandas(pdr, rpart)
    
                with dask.config.set({"dataframe.shuffle.method": "p2p"}):
                    expected = pdl.join(pdr, how=how)
                    assert_eq(
>                       await c.compute(ddl.join(ddr, how=how)),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not (PANDAS_GE_200 and expected.index.empty),
                    )

distributed/shuffle/tests/test_merge.py:292: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-2920fb1e84930799e8a936f9714a6850>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-2920fb1e84930799e8a936f9714a6850

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge_by_multiple_columns[outer] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-a048d71f41fa2758c0995d78b57592da
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:45587', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:45663', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:38155', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'outer'

    @pytest.mark.slow
    @gen_cluster(client=True, timeout=120)
    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    async def test_merge_by_multiple_columns(c, s, a, b, how):
        # warnings here from pandas
        pdf1l = pd.DataFrame(
            {
                "a": list("abcdefghij"),
                "b": list("abcdefghij"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf1r = pd.DataFrame(
            {
                "d": list("abcdefghij"),
                "e": list("abcdefghij"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("abcdefghij"),
        )
    
        pdf2l = pd.DataFrame(
            {
                "a": list("abcdeabcde"),
                "b": list("abcabcabca"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf2r = pd.DataFrame(
            {
                "d": list("edcbaedcba"),
                "e": list("aaabbbcccd"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("fghijklmno"),
        )
    
        pdf3l = pd.DataFrame(
            {
                "a": list("aaaaaaaaaa"),
                "b": list("aaaaaaaaaa"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf3r = pd.DataFrame(
            {
                "d": list("aaabbbccaa"),
                "e": list("abbbbbbbbb"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("ABCDEFGHIJ"),
        )
    
        for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
            for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
                ddl = dd.from_pandas(pdl, lpart)
                ddr = dd.from_pandas(pdr, rpart)
    
                with dask.config.set({"dataframe.shuffle.method": "p2p"}):
                    expected = pdl.join(pdr, how=how)
                    assert_eq(
>                       await c.compute(ddl.join(ddr, how=how)),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not (PANDAS_GE_200 and expected.index.empty),
                    )

distributed/shuffle/tests/test_merge.py:292: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-a048d71f41fa2758c0995d78b57592da>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-a048d71f41fa2758c0995d78b57592da

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge_by_multiple_columns[left] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-67e9d3bfb23f954a3d341a4de5282f79
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:38023', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:44615', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:35981', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'left'

    @pytest.mark.slow
    @gen_cluster(client=True, timeout=120)
    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    async def test_merge_by_multiple_columns(c, s, a, b, how):
        # warnings here from pandas
        pdf1l = pd.DataFrame(
            {
                "a": list("abcdefghij"),
                "b": list("abcdefghij"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf1r = pd.DataFrame(
            {
                "d": list("abcdefghij"),
                "e": list("abcdefghij"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("abcdefghij"),
        )
    
        pdf2l = pd.DataFrame(
            {
                "a": list("abcdeabcde"),
                "b": list("abcabcabca"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf2r = pd.DataFrame(
            {
                "d": list("edcbaedcba"),
                "e": list("aaabbbcccd"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("fghijklmno"),
        )
    
        pdf3l = pd.DataFrame(
            {
                "a": list("aaaaaaaaaa"),
                "b": list("aaaaaaaaaa"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf3r = pd.DataFrame(
            {
                "d": list("aaabbbccaa"),
                "e": list("abbbbbbbbb"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("ABCDEFGHIJ"),
        )
    
        for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
            for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
                ddl = dd.from_pandas(pdl, lpart)
                ddr = dd.from_pandas(pdr, rpart)
    
                with dask.config.set({"dataframe.shuffle.method": "p2p"}):
                    expected = pdl.join(pdr, how=how)
                    assert_eq(
>                       await c.compute(ddl.join(ddr, how=how)),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not (PANDAS_GE_200 and expected.index.empty),
                    )

distributed/shuffle/tests/test_merge.py:292: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-67e9d3bfb23f954a3d341a4de5282f79>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-67e9d3bfb23f954a3d341a4de5282f79

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge_by_multiple_columns[right] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-3bf5fa457df0231bc7228874bedf69e6
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:44039', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:36643', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:39061', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'right'

    @pytest.mark.slow
    @gen_cluster(client=True, timeout=120)
    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    async def test_merge_by_multiple_columns(c, s, a, b, how):
        # warnings here from pandas
        pdf1l = pd.DataFrame(
            {
                "a": list("abcdefghij"),
                "b": list("abcdefghij"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf1r = pd.DataFrame(
            {
                "d": list("abcdefghij"),
                "e": list("abcdefghij"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("abcdefghij"),
        )
    
        pdf2l = pd.DataFrame(
            {
                "a": list("abcdeabcde"),
                "b": list("abcabcabca"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf2r = pd.DataFrame(
            {
                "d": list("edcbaedcba"),
                "e": list("aaabbbcccd"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("fghijklmno"),
        )
    
        pdf3l = pd.DataFrame(
            {
                "a": list("aaaaaaaaaa"),
                "b": list("aaaaaaaaaa"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf3r = pd.DataFrame(
            {
                "d": list("aaabbbccaa"),
                "e": list("abbbbbbbbb"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("ABCDEFGHIJ"),
        )
    
        for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
            for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
                ddl = dd.from_pandas(pdl, lpart)
                ddr = dd.from_pandas(pdr, rpart)
    
                with dask.config.set({"dataframe.shuffle.method": "p2p"}):
                    expected = pdl.join(pdr, how=how)
                    assert_eq(
>                       await c.compute(ddl.join(ddr, how=how)),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not (PANDAS_GE_200 and expected.index.empty),
                    )

distributed/shuffle/tests/test_merge.py:292: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-3bf5fa457df0231bc7228874bedf69e6>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-3bf5fa457df0231bc7228874bedf69e6

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_index_merge_p2p[inner] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-8bee82abf05338bfacf545f2eacbe0bc
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:40853', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:46655', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:35891', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'inner'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_index_merge_p2p(c, s, a, b, how):
        pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
        pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
    
        left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
        right = dd.from_pandas(pdf_right, npartitions=6)
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            assert_eq(
>               await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
                pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
            )

distributed/shuffle/tests/test_merge.py:388: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-8bee82abf05338bfacf545f2eacbe0bc>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-8bee82abf05338bfacf545f2eacbe0bc

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_index_merge_p2p[left] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-75316c9754a970eda681ec3d7b465f99
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:34807', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:40081', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:41465', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'left'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_index_merge_p2p(c, s, a, b, how):
        pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
        pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
    
        left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
        right = dd.from_pandas(pdf_right, npartitions=6)
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            assert_eq(
>               await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
                pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
            )

distributed/shuffle/tests/test_merge.py:388: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-75316c9754a970eda681ec3d7b465f99>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-75316c9754a970eda681ec3d7b465f99

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_index_merge_p2p[right] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-20004be3884007b7f7661bebdc70ffa7
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:38833', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:41531', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:35113', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'right'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_index_merge_p2p(c, s, a, b, how):
        pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
        pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
    
        left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
        right = dd.from_pandas(pdf_right, npartitions=6)
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            assert_eq(
>               await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
                pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
            )

distributed/shuffle/tests/test_merge.py:388: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-20004be3884007b7f7661bebdc70ffa7>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-20004be3884007b7f7661bebdc70ffa7

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_index_merge_p2p[outer] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-c39a92473466263a55511f2615156ee2
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:33269', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:32799', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:46789', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'outer'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_index_merge_p2p(c, s, a, b, how):
        pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
        pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
    
        left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
        right = dd.from_pandas(pdf_right, npartitions=6)
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            assert_eq(
>               await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
                pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
            )

distributed/shuffle/tests/test_merge.py:388: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-c39a92473466263a55511f2615156ee2>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-c39a92473466263a55511f2615156ee2

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge_with_npartitions[4] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-cb5dcf43490a0323d9216c8645e554f6
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:35153', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:34587', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:38797', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
npartitions = 4

    @pytest.mark.parametrize("npartitions", [4, 5, 10, 20])
    @gen_cluster(client=True)
    async def test_merge_with_npartitions(c, s, a, b, npartitions):
        pdf = pd.DataFrame({"a": [1, 2, 3, 4] * 10, "b": 1})
    
        left = dd.from_pandas(pdf, npartitions=10)
        right = dd.from_pandas(pdf, npartitions=5)
    
        expected = pdf.merge(pdf)
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
>           result = await c.compute(left.merge(right, npartitions=npartitions))

distributed/shuffle/tests/test_merge.py:408: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-cb5dcf43490a0323d9216c8645e554f6>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-cb5dcf43490a0323d9216c8645e554f6

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge_with_npartitions[5] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-d6634a86dd53665a78003094935e45c4
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:44341', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:40267', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:40909', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
npartitions = 5

    @pytest.mark.parametrize("npartitions", [4, 5, 10, 20])
    @gen_cluster(client=True)
    async def test_merge_with_npartitions(c, s, a, b, npartitions):
        pdf = pd.DataFrame({"a": [1, 2, 3, 4] * 10, "b": 1})
    
        left = dd.from_pandas(pdf, npartitions=10)
        right = dd.from_pandas(pdf, npartitions=5)
    
        expected = pdf.merge(pdf)
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
>           result = await c.compute(left.merge(right, npartitions=npartitions))

distributed/shuffle/tests/test_merge.py:408: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-d6634a86dd53665a78003094935e45c4>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-d6634a86dd53665a78003094935e45c4

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge_with_npartitions[10] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-9b5d2873c7675e6fbccafd7cf2117ac1
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:44311', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:41895', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:37915', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
npartitions = 10

    @pytest.mark.parametrize("npartitions", [4, 5, 10, 20])
    @gen_cluster(client=True)
    async def test_merge_with_npartitions(c, s, a, b, npartitions):
        pdf = pd.DataFrame({"a": [1, 2, 3, 4] * 10, "b": 1})
    
        left = dd.from_pandas(pdf, npartitions=10)
        right = dd.from_pandas(pdf, npartitions=5)
    
        expected = pdf.merge(pdf)
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
>           result = await c.compute(left.merge(right, npartitions=npartitions))

distributed/shuffle/tests/test_merge.py:408: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-9b5d2873c7675e6fbccafd7cf2117ac1>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-9b5d2873c7675e6fbccafd7cf2117ac1

distributed/client.py:342: CancelledError