Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make P2P more configurable #8469

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Configuration

ce97cce
Select commit
Loading
Failed to load commit list.
Sign in for the full log view
Draft

Make P2P more configurable #8469

Configuration
ce97cce
Select commit
Loading
Failed to load commit list.
GitHub Actions / Unit Test Results failed Jan 19, 2024 in 0s

88 fail, 109 skipped, 3 763 pass in 9h 30m 51s

    27 files      27 suites   9h 30m 51s ⏱️
 3 960 tests  3 763 ✅   109 💤  88 ❌
49 809 runs  46 634 ✅ 2 289 💤 886 ❌

Results for commit ce97cce.

Annotations

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 11 runs failed: test_minimal_version (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53376', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer

    @gen_cluster(client=True)
    async def test_minimal_version(c, s, a, b):
        no_pyarrow_ctx = (
            mock.patch.dict("sys.modules", {"pyarrow": None})
            if pa is not None
            else contextlib.nullcontext()
        )
        with no_pyarrow_ctx:
            A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
            a = dd.repartition(A, [0, 4, 5])
    
            B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
            b = dd.repartition(B, [0, 2, 5])
    
            with pytest.raises(
                ModuleNotFoundError, match="requires pyarrow"
            ), dask.config.set({"dataframe.shuffle.method": "p2p"}):
>               await c.compute(dd.merge(a, b, left_on="x", right_on="z"))

distributed\shuffle\tests\test_merge.py:72: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
    return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
    return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
    disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None

    def get(
        key: str,
        default: Any = no_default,
        config: dict = config,
        override_with: Any = None,
    ) -> Any:
        """
        Get elements from global config
    
        If ``override_with`` is not None this value will be passed straight back.
        Useful for getting kwarg defaults from Dask config.
    
        Use '.' for nested access
    
        Examples
        --------
        >>> from dask import config
        >>> config.get('foo')  # doctest: +SKIP
        {'x': 1, 'y': 2}
    
        >>> config.get('foo.x')  # doctest: +SKIP
        1
    
        >>> config.get('foo.x.y', default=123)  # doctest: +SKIP
        123
    
        >>> config.get('foo.y', override_with=None)  # doctest: +SKIP
        2
    
        >>> config.get('foo.y', override_with=3)  # doctest: +SKIP
        3
    
        See Also
        --------
        dask.config.set
        """
        if override_with is not None:
            return override_with
        keys = key.split(".")
        result = config
        for k in keys:
            k = canonical_name(k, result)
            try:
>               result = result[k]
E               KeyError: 'disk'

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 11 runs failed: test_basic_merge[inner] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53388', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'inner'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_basic_merge(c, s, a, b, how):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
>       joined = hash_join(a, "y", b, "y", how)

distributed\shuffle\tests\test_merge.py:84: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed\shuffle\_merge.py:110: in hash_join_p2p
    disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None

    def get(
        key: str,
        default: Any = no_default,
        config: dict = config,
        override_with: Any = None,
    ) -> Any:
        """
        Get elements from global config
    
        If ``override_with`` is not None this value will be passed straight back.
        Useful for getting kwarg defaults from Dask config.
    
        Use '.' for nested access
    
        Examples
        --------
        >>> from dask import config
        >>> config.get('foo')  # doctest: +SKIP
        {'x': 1, 'y': 2}
    
        >>> config.get('foo.x')  # doctest: +SKIP
        1
    
        >>> config.get('foo.x.y', default=123)  # doctest: +SKIP
        123
    
        >>> config.get('foo.y', override_with=None)  # doctest: +SKIP
        2
    
        >>> config.get('foo.y', override_with=3)  # doctest: +SKIP
        3
    
        See Also
        --------
        dask.config.set
        """
        if override_with is not None:
            return override_with
        keys = key.split(".")
        result = config
        for k in keys:
            k = canonical_name(k, result)
            try:
>               result = result[k]
E               KeyError: 'disk'

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 11 runs failed: test_basic_merge[left] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53400', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'left'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_basic_merge(c, s, a, b, how):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
>       joined = hash_join(a, "y", b, "y", how)

distributed\shuffle\tests\test_merge.py:84: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed\shuffle\_merge.py:110: in hash_join_p2p
    disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None

    def get(
        key: str,
        default: Any = no_default,
        config: dict = config,
        override_with: Any = None,
    ) -> Any:
        """
        Get elements from global config
    
        If ``override_with`` is not None this value will be passed straight back.
        Useful for getting kwarg defaults from Dask config.
    
        Use '.' for nested access
    
        Examples
        --------
        >>> from dask import config
        >>> config.get('foo')  # doctest: +SKIP
        {'x': 1, 'y': 2}
    
        >>> config.get('foo.x')  # doctest: +SKIP
        1
    
        >>> config.get('foo.x.y', default=123)  # doctest: +SKIP
        123
    
        >>> config.get('foo.y', override_with=None)  # doctest: +SKIP
        2
    
        >>> config.get('foo.y', override_with=3)  # doctest: +SKIP
        3
    
        See Also
        --------
        dask.config.set
        """
        if override_with is not None:
            return override_with
        keys = key.split(".")
        result = config
        for k in keys:
            k = canonical_name(k, result)
            try:
>               result = result[k]
E               KeyError: 'disk'

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 11 runs failed: test_basic_merge[right] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53412', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'right'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_basic_merge(c, s, a, b, how):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
>       joined = hash_join(a, "y", b, "y", how)

distributed\shuffle\tests\test_merge.py:84: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed\shuffle\_merge.py:110: in hash_join_p2p
    disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None

    def get(
        key: str,
        default: Any = no_default,
        config: dict = config,
        override_with: Any = None,
    ) -> Any:
        """
        Get elements from global config
    
        If ``override_with`` is not None this value will be passed straight back.
        Useful for getting kwarg defaults from Dask config.
    
        Use '.' for nested access
    
        Examples
        --------
        >>> from dask import config
        >>> config.get('foo')  # doctest: +SKIP
        {'x': 1, 'y': 2}
    
        >>> config.get('foo.x')  # doctest: +SKIP
        1
    
        >>> config.get('foo.x.y', default=123)  # doctest: +SKIP
        123
    
        >>> config.get('foo.y', override_with=None)  # doctest: +SKIP
        2
    
        >>> config.get('foo.y', override_with=3)  # doctest: +SKIP
        3
    
        See Also
        --------
        dask.config.set
        """
        if override_with is not None:
            return override_with
        keys = key.split(".")
        result = config
        for k in keys:
            k = canonical_name(k, result)
            try:
>               result = result[k]
E               KeyError: 'disk'

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 11 runs failed: test_basic_merge[outer] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53424', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'outer'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_basic_merge(c, s, a, b, how):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
>       joined = hash_join(a, "y", b, "y", how)

distributed\shuffle\tests\test_merge.py:84: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed\shuffle\_merge.py:110: in hash_join_p2p
    disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None

    def get(
        key: str,
        default: Any = no_default,
        config: dict = config,
        override_with: Any = None,
    ) -> Any:
        """
        Get elements from global config
    
        If ``override_with`` is not None this value will be passed straight back.
        Useful for getting kwarg defaults from Dask config.
    
        Use '.' for nested access
    
        Examples
        --------
        >>> from dask import config
        >>> config.get('foo')  # doctest: +SKIP
        {'x': 1, 'y': 2}
    
        >>> config.get('foo.x')  # doctest: +SKIP
        1
    
        >>> config.get('foo.x.y', default=123)  # doctest: +SKIP
        123
    
        >>> config.get('foo.y', override_with=None)  # doctest: +SKIP
        2
    
        >>> config.get('foo.y', override_with=3)  # doctest: +SKIP
        3
    
        See Also
        --------
        dask.config.set
        """
        if override_with is not None:
            return override_with
        keys = key.split(".")
        result = config
        for k in keys:
            k = canonical_name(k, result)
            try:
>               result = result[k]
E               KeyError: 'disk'

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 11 runs failed: test_merge_p2p_shuffle_reused_dataframe_with_different_parameters (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53436', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:53437', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:53440', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_merge_p2p_shuffle_reused_dataframe_with_different_parameters(c, s, a, b):
        pdf1 = pd.DataFrame({"a": range(100), "b": range(0, 200, 2)})
        pdf2 = pd.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50})
        ddf1 = dd.from_pandas(pdf1, npartitions=5)
        ddf2 = dd.from_pandas(pdf2, npartitions=10)
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            out = (
>               ddf1.merge(ddf2, left_on="a", right_on="x")
                # Vary the number of output partitions for the shuffles of dd2
                .repartition(npartitions=20).merge(ddf2, left_on="b", right_on="x")
            )

distributed\shuffle\tests\test_merge.py:120: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\core.py:6010: in merge
    return merge(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
    return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
    return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
    disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None

    def get(
        key: str,
        default: Any = no_default,
        config: dict = config,
        override_with: Any = None,
    ) -> Any:
        """
        Get elements from global config
    
        If ``override_with`` is not None this value will be passed straight back.
        Useful for getting kwarg defaults from Dask config.
    
        Use '.' for nested access
    
        Examples
        --------
        >>> from dask import config
        >>> config.get('foo')  # doctest: +SKIP
        {'x': 1, 'y': 2}
    
        >>> config.get('foo.x')  # doctest: +SKIP
        1
    
        >>> config.get('foo.x.y', default=123)  # doctest: +SKIP
        123
    
        >>> config.get('foo.y', override_with=None)  # doctest: +SKIP
        2
    
        >>> config.get('foo.y', override_with=3)  # doctest: +SKIP
        3
    
        See Also
        --------
        dask.config.set
        """
        if override_with is not None:
            return override_with
        keys = key.split(".")
        result = config
        for k in keys:
            k = canonical_name(k, result)
            try:
>               result = result[k]
E               KeyError: 'disk'

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 11 runs failed: test_merge_p2p_shuffle_reused_dataframe_with_same_parameters (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53448', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:53449', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:53452', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_merge_p2p_shuffle_reused_dataframe_with_same_parameters(c, s, a, b):
        pdf1 = pd.DataFrame({"a": range(100), "b": range(0, 200, 2)})
        pdf2 = pd.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50})
        ddf1 = dd.from_pandas(pdf1, npartitions=5)
        ddf2 = dd.from_pandas(pdf2, npartitions=10)
    
        # This performs two shuffles:
        #   * ddf1 is shuffled on `a`
        #   * ddf2 is shuffled on `x`
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
>           ddf3 = ddf1.merge(
                ddf2,
                left_on="a",
                right_on="x",
            )

distributed\shuffle\tests\test_merge.py:146: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\core.py:6010: in merge
    return merge(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
    return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
    return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
    disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None

    def get(
        key: str,
        default: Any = no_default,
        config: dict = config,
        override_with: Any = None,
    ) -> Any:
        """
        Get elements from global config
    
        If ``override_with`` is not None this value will be passed straight back.
        Useful for getting kwarg defaults from Dask config.
    
        Use '.' for nested access
    
        Examples
        --------
        >>> from dask import config
        >>> config.get('foo')  # doctest: +SKIP
        {'x': 1, 'y': 2}
    
        >>> config.get('foo.x')  # doctest: +SKIP
        1
    
        >>> config.get('foo.x.y', default=123)  # doctest: +SKIP
        123
    
        >>> config.get('foo.y', override_with=None)  # doctest: +SKIP
        2
    
        >>> config.get('foo.y', override_with=3)  # doctest: +SKIP
        3
    
        See Also
        --------
        dask.config.set
        """
        if override_with is not None:
            return override_with
        keys = key.split(".")
        result = config
        for k in keys:
            k = canonical_name(k, result)
            try:
>               result = result[k]
E               KeyError: 'disk'

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 11 runs failed: test_merge[True-inner] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 1s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53460', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'inner', disk = True

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.storage.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
            res = await c.compute(joined)
            assert_eq(
                res,
                pd.merge(A, B, left_index=True, right_index=True, how=how),
            )
>           joined = dd.merge(a, b, on="y", how=how)

distributed\shuffle\tests\test_merge.py:190: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
    return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
    return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
    disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None

    def get(
        key: str,
        default: Any = no_default,
        config: dict = config,
        override_with: Any = None,
    ) -> Any:
        """
        Get elements from global config
    
        If ``override_with`` is not None this value will be passed straight back.
        Useful for getting kwarg defaults from Dask config.
    
        Use '.' for nested access
    
        Examples
        --------
        >>> from dask import config
        >>> config.get('foo')  # doctest: +SKIP
        {'x': 1, 'y': 2}
    
        >>> config.get('foo.x')  # doctest: +SKIP
        1
    
        >>> config.get('foo.x.y', default=123)  # doctest: +SKIP
        123
    
        >>> config.get('foo.y', override_with=None)  # doctest: +SKIP
        2
    
        >>> config.get('foo.y', override_with=3)  # doctest: +SKIP
        3
    
        See Also
        --------
        dask.config.set
        """
        if override_with is not None:
            return override_with
        keys = key.split(".")
        result = config
        for k in keys:
            k = canonical_name(k, result)
            try:
>               result = result[k]
E               KeyError: 'disk'

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 11 runs failed: test_merge[True-outer] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 1s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53477', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'outer', disk = True

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.storage.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
            res = await c.compute(joined)
            assert_eq(
                res,
                pd.merge(A, B, left_index=True, right_index=True, how=how),
            )
>           joined = dd.merge(a, b, on="y", how=how)

distributed\shuffle\tests\test_merge.py:190: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
    return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
    return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
    disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None

    def get(
        key: str,
        default: Any = no_default,
        config: dict = config,
        override_with: Any = None,
    ) -> Any:
        """
        Get elements from global config
    
        If ``override_with`` is not None this value will be passed straight back.
        Useful for getting kwarg defaults from Dask config.
    
        Use '.' for nested access
    
        Examples
        --------
        >>> from dask import config
        >>> config.get('foo')  # doctest: +SKIP
        {'x': 1, 'y': 2}
    
        >>> config.get('foo.x')  # doctest: +SKIP
        1
    
        >>> config.get('foo.x.y', default=123)  # doctest: +SKIP
        123
    
        >>> config.get('foo.y', override_with=None)  # doctest: +SKIP
        2
    
        >>> config.get('foo.y', override_with=3)  # doctest: +SKIP
        3
    
        See Also
        --------
        dask.config.set
        """
        if override_with is not None:
            return override_with
        keys = key.split(".")
        result = config
        for k in keys:
            k = canonical_name(k, result)
            try:
>               result = result[k]
E               KeyError: 'disk'

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 11 runs failed: test_merge[True-left] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 1s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53494', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'left', disk = True

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.storage.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
            res = await c.compute(joined)
            assert_eq(
                res,
                pd.merge(A, B, left_index=True, right_index=True, how=how),
            )
>           joined = dd.merge(a, b, on="y", how=how)

distributed\shuffle\tests\test_merge.py:190: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
    return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
    return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
    disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None

    def get(
        key: str,
        default: Any = no_default,
        config: dict = config,
        override_with: Any = None,
    ) -> Any:
        """
        Get elements from global config
    
        If ``override_with`` is not None this value will be passed straight back.
        Useful for getting kwarg defaults from Dask config.
    
        Use '.' for nested access
    
        Examples
        --------
        >>> from dask import config
        >>> config.get('foo')  # doctest: +SKIP
        {'x': 1, 'y': 2}
    
        >>> config.get('foo.x')  # doctest: +SKIP
        1
    
        >>> config.get('foo.x.y', default=123)  # doctest: +SKIP
        123
    
        >>> config.get('foo.y', override_with=None)  # doctest: +SKIP
        2
    
        >>> config.get('foo.y', override_with=3)  # doctest: +SKIP
        3
    
        See Also
        --------
        dask.config.set
        """
        if override_with is not None:
            return override_with
        keys = key.split(".")
        result = config
        for k in keys:
            k = canonical_name(k, result)
            try:
>               result = result[k]
E               KeyError: 'disk'

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 11 runs failed: test_merge[True-right] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 1s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53511', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'right', disk = True

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.storage.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
            res = await c.compute(joined)
            assert_eq(
                res,
                pd.merge(A, B, left_index=True, right_index=True, how=how),
            )
>           joined = dd.merge(a, b, on="y", how=how)

distributed\shuffle\tests\test_merge.py:190: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
    return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
    return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
    disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None

    def get(
        key: str,
        default: Any = no_default,
        config: dict = config,
        override_with: Any = None,
    ) -> Any:
        """
        Get elements from global config
    
        If ``override_with`` is not None this value will be passed straight back.
        Useful for getting kwarg defaults from Dask config.
    
        Use '.' for nested access
    
        Examples
        --------
        >>> from dask import config
        >>> config.get('foo')  # doctest: +SKIP
        {'x': 1, 'y': 2}
    
        >>> config.get('foo.x')  # doctest: +SKIP
        1
    
        >>> config.get('foo.x.y', default=123)  # doctest: +SKIP
        123
    
        >>> config.get('foo.y', override_with=None)  # doctest: +SKIP
        2
    
        >>> config.get('foo.y', override_with=3)  # doctest: +SKIP
        3
    
        See Also
        --------
        dask.config.set
        """
        if override_with is not None:
            return override_with
        keys = key.split(".")
        result = config
        for k in keys:
            k = canonical_name(k, result)
            try:
>               result = result[k]
E               KeyError: 'disk'

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 11 runs failed: test_merge[False-inner] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 1s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53528', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'inner', disk = False

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.storage.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
            res = await c.compute(joined)
            assert_eq(
                res,
                pd.merge(A, B, left_index=True, right_index=True, how=how),
            )
>           joined = dd.merge(a, b, on="y", how=how)

distributed\shuffle\tests\test_merge.py:190: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
    return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
    return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
    disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None

    def get(
        key: str,
        default: Any = no_default,
        config: dict = config,
        override_with: Any = None,
    ) -> Any:
        """
        Get elements from global config
    
        If ``override_with`` is not None this value will be passed straight back.
        Useful for getting kwarg defaults from Dask config.
    
        Use '.' for nested access
    
        Examples
        --------
        >>> from dask import config
        >>> config.get('foo')  # doctest: +SKIP
        {'x': 1, 'y': 2}
    
        >>> config.get('foo.x')  # doctest: +SKIP
        1
    
        >>> config.get('foo.x.y', default=123)  # doctest: +SKIP
        123
    
        >>> config.get('foo.y', override_with=None)  # doctest: +SKIP
        2
    
        >>> config.get('foo.y', override_with=3)  # doctest: +SKIP
        3
    
        See Also
        --------
        dask.config.set
        """
        if override_with is not None:
            return override_with
        keys = key.split(".")
        result = config
        for k in keys:
            k = canonical_name(k, result)
            try:
>               result = result[k]
E               KeyError: 'disk'

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 11 runs failed: test_merge[False-outer] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 1s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53545', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'outer', disk = False

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.storage.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
            res = await c.compute(joined)
            assert_eq(
                res,
                pd.merge(A, B, left_index=True, right_index=True, how=how),
            )
>           joined = dd.merge(a, b, on="y", how=how)

distributed\shuffle\tests\test_merge.py:190: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
    return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
    return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
    disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None

    def get(
        key: str,
        default: Any = no_default,
        config: dict = config,
        override_with: Any = None,
    ) -> Any:
        """
        Get elements from global config
    
        If ``override_with`` is not None this value will be passed straight back.
        Useful for getting kwarg defaults from Dask config.
    
        Use '.' for nested access
    
        Examples
        --------
        >>> from dask import config
        >>> config.get('foo')  # doctest: +SKIP
        {'x': 1, 'y': 2}
    
        >>> config.get('foo.x')  # doctest: +SKIP
        1
    
        >>> config.get('foo.x.y', default=123)  # doctest: +SKIP
        123
    
        >>> config.get('foo.y', override_with=None)  # doctest: +SKIP
        2
    
        >>> config.get('foo.y', override_with=3)  # doctest: +SKIP
        3
    
        See Also
        --------
        dask.config.set
        """
        if override_with is not None:
            return override_with
        keys = key.split(".")
        result = config
        for k in keys:
            k = canonical_name(k, result)
            try:
>               result = result[k]
E               KeyError: 'disk'

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 11 runs failed: test_merge[False-left] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 1s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53562', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'left', disk = False

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.storage.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
            res = await c.compute(joined)
            assert_eq(
                res,
                pd.merge(A, B, left_index=True, right_index=True, how=how),
            )
>           joined = dd.merge(a, b, on="y", how=how)

distributed\shuffle\tests\test_merge.py:190: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
    return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
    return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
    disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None

    def get(
        key: str,
        default: Any = no_default,
        config: dict = config,
        override_with: Any = None,
    ) -> Any:
        """
        Get elements from global config
    
        If ``override_with`` is not None this value will be passed straight back.
        Useful for getting kwarg defaults from Dask config.
    
        Use '.' for nested access
    
        Examples
        --------
        >>> from dask import config
        >>> config.get('foo')  # doctest: +SKIP
        {'x': 1, 'y': 2}
    
        >>> config.get('foo.x')  # doctest: +SKIP
        1
    
        >>> config.get('foo.x.y', default=123)  # doctest: +SKIP
        123
    
        >>> config.get('foo.y', override_with=None)  # doctest: +SKIP
        2
    
        >>> config.get('foo.y', override_with=3)  # doctest: +SKIP
        3
    
        See Also
        --------
        dask.config.set
        """
        if override_with is not None:
            return override_with
        keys = key.split(".")
        result = config
        for k in keys:
            k = canonical_name(k, result)
            try:
>               result = result[k]
E               KeyError: 'disk'

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 11 runs failed: test_merge[False-right] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 1s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53580', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'right', disk = False

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.storage.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
            res = await c.compute(joined)
            assert_eq(
                res,
                pd.merge(A, B, left_index=True, right_index=True, how=how),
            )
>           joined = dd.merge(a, b, on="y", how=how)

distributed\shuffle\tests\test_merge.py:190: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
    return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
    return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
    disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None

    def get(
        key: str,
        default: Any = no_default,
        config: dict = config,
        override_with: Any = None,
    ) -> Any:
        """
        Get elements from global config
    
        If ``override_with`` is not None this value will be passed straight back.
        Useful for getting kwarg defaults from Dask config.
    
        Use '.' for nested access
    
        Examples
        --------
        >>> from dask import config
        >>> config.get('foo')  # doctest: +SKIP
        {'x': 1, 'y': 2}
    
        >>> config.get('foo.x')  # doctest: +SKIP
        1
    
        >>> config.get('foo.x.y', default=123)  # doctest: +SKIP
        123
    
        >>> config.get('foo.y', override_with=None)  # doctest: +SKIP
        2
    
        >>> config.get('foo.y', override_with=3)  # doctest: +SKIP
        3
    
        See Also
        --------
        dask.config.set
        """
        if override_with is not None:
            return override_with
        keys = key.split(".")
        result = config
        for k in keys:
            k = canonical_name(k, result)
            try:
>               result = result[k]
E               KeyError: 'disk'

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 11 runs failed: test_merge_by_multiple_columns[inner] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 2s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53597', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:53598', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:53601', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'inner'

    @pytest.mark.slow
    @gen_cluster(client=True, timeout=120)
    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    async def test_merge_by_multiple_columns(c, s, a, b, how):
        # warnings here from pandas
        pdf1l = pd.DataFrame(
            {
                "a": list("abcdefghij"),
                "b": list("abcdefghij"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf1r = pd.DataFrame(
            {
                "d": list("abcdefghij"),
                "e": list("abcdefghij"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("abcdefghij"),
        )
    
        pdf2l = pd.DataFrame(
            {
                "a": list("abcdeabcde"),
                "b": list("abcabcabca"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf2r = pd.DataFrame(
            {
                "d": list("edcbaedcba"),
                "e": list("aaabbbcccd"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("fghijklmno"),
        )
    
        pdf3l = pd.DataFrame(
            {
                "a": list("aaaaaaaaaa"),
                "b": list("aaaaaaaaaa"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf3r = pd.DataFrame(
            {
                "d": list("aaabbbccaa"),
                "e": list("abbbbbbbbb"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("ABCDEFGHIJ"),
        )
    
        for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
            for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
                ddl = dd.from_pandas(pdl, lpart)
                ddr = dd.from_pandas(pdr, rpart)
    
                with dask.config.set({"dataframe.shuffle.method": "p2p"}):
                    expected = pdl.join(pdr, how=how)
                    assert_eq(
                        await c.compute(ddl.join(ddr, how=how)),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not (PANDAS_GE_200 and expected.index.empty),
                    )
    
                    expected = pdr.join(pdl, how=how)
                    assert_eq(
                        await c.compute(ddr.join(ddl, how=how)),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not (PANDAS_GE_200 and expected.index.empty),
                    )
    
                    expected = pd.merge(
                        pdl, pdr, how=how, left_index=True, right_index=True
                    )
                    assert_eq(
                        await c.compute(
                            dd.merge(
                                ddl,
                                ddr,
                                how=how,
                                left_index=True,
                                right_index=True,
                            )
                        ),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not (PANDAS_GE_200 and expected.index.empty),
                    )
    
                    expected = pd.merge(
                        pdr, pdl, how=how, left_index=True, right_index=True
                    )
                    assert_eq(
                        await c.compute(
                            dd.merge(
                                ddr,
                                ddl,
                                how=how,
                                left_index=True,
                                right_index=True,
                            )
                        ),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not (PANDAS_GE_200 and expected.index.empty),
                    )
    
                    # hash join
                    list_eq(
                        await c.compute(
>                           dd.merge(
                                ddl,
                                ddr,
                                how=how,
                                left_on="a",
                                right_on="d",
                            )
                        ),
                        pd.merge(pdl, pdr, how=how, left_on="a", right_on="d"),
                    )

distributed\shuffle\tests\test_merge.py:396: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
    return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
    return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
    disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None

    def get(
        key: str,
        default: Any = no_default,
        config: dict = config,
        override_with: Any = None,
    ) -> Any:
        """
        Get elements from global config
    
        If ``override_with`` is not None this value will be passed straight back.
        Useful for getting kwarg defaults from Dask config.
    
        Use '.' for nested access
    
        Examples
        --------
        >>> from dask import config
        >>> config.get('foo')  # doctest: +SKIP
        {'x': 1, 'y': 2}
    
        >>> config.get('foo.x')  # doctest: +SKIP
        1
    
        >>> config.get('foo.x.y', default=123)  # doctest: +SKIP
        123
    
        >>> config.get('foo.y', override_with=None)  # doctest: +SKIP
        2
    
        >>> config.get('foo.y', override_with=3)  # doctest: +SKIP
        3
    
        See Also
        --------
        dask.config.set
        """
        if override_with is not None:
            return override_with
        keys = key.split(".")
        result = config
        for k in keys:
            k = canonical_name(k, result)
            try:
>               result = result[k]
E               KeyError: 'disk'

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 11 runs failed: test_merge_by_multiple_columns[outer] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 2s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53615', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:53616', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:53619', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'outer'

    @pytest.mark.slow
    @gen_cluster(client=True, timeout=120)
    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    async def test_merge_by_multiple_columns(c, s, a, b, how):
        # warnings here from pandas
        pdf1l = pd.DataFrame(
            {
                "a": list("abcdefghij"),
                "b": list("abcdefghij"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf1r = pd.DataFrame(
            {
                "d": list("abcdefghij"),
                "e": list("abcdefghij"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("abcdefghij"),
        )
    
        pdf2l = pd.DataFrame(
            {
                "a": list("abcdeabcde"),
                "b": list("abcabcabca"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf2r = pd.DataFrame(
            {
                "d": list("edcbaedcba"),
                "e": list("aaabbbcccd"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("fghijklmno"),
        )
    
        pdf3l = pd.DataFrame(
            {
                "a": list("aaaaaaaaaa"),
                "b": list("aaaaaaaaaa"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf3r = pd.DataFrame(
            {
                "d": list("aaabbbccaa"),
                "e": list("abbbbbbbbb"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("ABCDEFGHIJ"),
        )
    
        for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
            for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
                ddl = dd.from_pandas(pdl, lpart)
                ddr = dd.from_pandas(pdr, rpart)
    
                with dask.config.set({"dataframe.shuffle.method": "p2p"}):
                    expected = pdl.join(pdr, how=how)
                    assert_eq(
                        await c.compute(ddl.join(ddr, how=how)),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not (PANDAS_GE_200 and expected.index.empty),
                    )
    
                    expected = pdr.join(pdl, how=how)
                    assert_eq(
                        await c.compute(ddr.join(ddl, how=how)),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not (PANDAS_GE_200 and expected.index.empty),
                    )
    
                    expected = pd.merge(
                        pdl, pdr, how=how, left_index=True, right_index=True
                    )
                    assert_eq(
                        await c.compute(
                            dd.merge(
                                ddl,
                                ddr,
                                how=how,
                                left_index=True,
                                right_index=True,
                            )
                        ),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not (PANDAS_GE_200 and expected.index.empty),
                    )
    
                    expected = pd.merge(
                        pdr, pdl, how=how, left_index=True, right_index=True
                    )
                    assert_eq(
                        await c.compute(
                            dd.merge(
                                ddr,
                                ddl,
                                how=how,
                                left_index=True,
                                right_index=True,
                            )
                        ),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not (PANDAS_GE_200 and expected.index.empty),
                    )
    
                    # hash join
                    list_eq(
                        await c.compute(
>                           dd.merge(
                                ddl,
                                ddr,
                                how=how,
                                left_on="a",
                                right_on="d",
                            )
                        ),
                        pd.merge(pdl, pdr, how=how, left_on="a", right_on="d"),
                    )

distributed\shuffle\tests\test_merge.py:396: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
    return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
    return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
    disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None

    def get(
        key: str,
        default: Any = no_default,
        config: dict = config,
        override_with: Any = None,
    ) -> Any:
        """
        Get elements from global config
    
        If ``override_with`` is not None this value will be passed straight back.
        Useful for getting kwarg defaults from Dask config.
    
        Use '.' for nested access
    
        Examples
        --------
        >>> from dask import config
        >>> config.get('foo')  # doctest: +SKIP
        {'x': 1, 'y': 2}
    
        >>> config.get('foo.x')  # doctest: +SKIP
        1
    
        >>> config.get('foo.x.y', default=123)  # doctest: +SKIP
        123
    
        >>> config.get('foo.y', override_with=None)  # doctest: +SKIP
        2
    
        >>> config.get('foo.y', override_with=3)  # doctest: +SKIP
        3
    
        See Also
        --------
        dask.config.set
        """
        if override_with is not None:
            return override_with
        keys = key.split(".")
        result = config
        for k in keys:
            k = canonical_name(k, result)
            try:
>               result = result[k]
E               KeyError: 'disk'

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 11 runs failed: test_merge_by_multiple_columns[left] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 2s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53632', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:53633', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:53636', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'left'

    @pytest.mark.slow
    @gen_cluster(client=True, timeout=120)
    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    async def test_merge_by_multiple_columns(c, s, a, b, how):
        # warnings here from pandas
        pdf1l = pd.DataFrame(
            {
                "a": list("abcdefghij"),
                "b": list("abcdefghij"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf1r = pd.DataFrame(
            {
                "d": list("abcdefghij"),
                "e": list("abcdefghij"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("abcdefghij"),
        )
    
        pdf2l = pd.DataFrame(
            {
                "a": list("abcdeabcde"),
                "b": list("abcabcabca"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf2r = pd.DataFrame(
            {
                "d": list("edcbaedcba"),
                "e": list("aaabbbcccd"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("fghijklmno"),
        )
    
        pdf3l = pd.DataFrame(
            {
                "a": list("aaaaaaaaaa"),
                "b": list("aaaaaaaaaa"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf3r = pd.DataFrame(
            {
                "d": list("aaabbbccaa"),
                "e": list("abbbbbbbbb"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("ABCDEFGHIJ"),
        )
    
        for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
            for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
                ddl = dd.from_pandas(pdl, lpart)
                ddr = dd.from_pandas(pdr, rpart)
    
                with dask.config.set({"dataframe.shuffle.method": "p2p"}):
                    expected = pdl.join(pdr, how=how)
                    assert_eq(
                        await c.compute(ddl.join(ddr, how=how)),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not (PANDAS_GE_200 and expected.index.empty),
                    )
    
                    expected = pdr.join(pdl, how=how)
                    assert_eq(
                        await c.compute(ddr.join(ddl, how=how)),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not (PANDAS_GE_200 and expected.index.empty),
                    )
    
                    expected = pd.merge(
                        pdl, pdr, how=how, left_index=True, right_index=True
                    )
                    assert_eq(
                        await c.compute(
                            dd.merge(
                                ddl,
                                ddr,
                                how=how,
                                left_index=True,
                                right_index=True,
                            )
                        ),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not (PANDAS_GE_200 and expected.index.empty),
                    )
    
                    expected = pd.merge(
                        pdr, pdl, how=how, left_index=True, right_index=True
                    )
                    assert_eq(
                        await c.compute(
                            dd.merge(
                                ddr,
                                ddl,
                                how=how,
                                left_index=True,
                                right_index=True,
                            )
                        ),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not (PANDAS_GE_200 and expected.index.empty),
                    )
    
                    # hash join
                    list_eq(
                        await c.compute(
>                           dd.merge(
                                ddl,
                                ddr,
                                how=how,
                                left_on="a",
                                right_on="d",
                            )
                        ),
                        pd.merge(pdl, pdr, how=how, left_on="a", right_on="d"),
                    )

distributed\shuffle\tests\test_merge.py:396: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
    return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
    return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
    disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None

    def get(
        key: str,
        default: Any = no_default,
        config: dict = config,
        override_with: Any = None,
    ) -> Any:
        """
        Get elements from global config
    
        If ``override_with`` is not None this value will be passed straight back.
        Useful for getting kwarg defaults from Dask config.
    
        Use '.' for nested access
    
        Examples
        --------
        >>> from dask import config
        >>> config.get('foo')  # doctest: +SKIP
        {'x': 1, 'y': 2}
    
        >>> config.get('foo.x')  # doctest: +SKIP
        1
    
        >>> config.get('foo.x.y', default=123)  # doctest: +SKIP
        123
    
        >>> config.get('foo.y', override_with=None)  # doctest: +SKIP
        2
    
        >>> config.get('foo.y', override_with=3)  # doctest: +SKIP
        3
    
        See Also
        --------
        dask.config.set
        """
        if override_with is not None:
            return override_with
        keys = key.split(".")
        result = config
        for k in keys:
            k = canonical_name(k, result)
            try:
>               result = result[k]
E               KeyError: 'disk'

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 11 runs failed: test_merge_by_multiple_columns[right] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 2s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53649', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:53650', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:53653', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'right'

    @pytest.mark.slow
    @gen_cluster(client=True, timeout=120)
    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    async def test_merge_by_multiple_columns(c, s, a, b, how):
        # warnings here from pandas
        pdf1l = pd.DataFrame(
            {
                "a": list("abcdefghij"),
                "b": list("abcdefghij"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf1r = pd.DataFrame(
            {
                "d": list("abcdefghij"),
                "e": list("abcdefghij"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("abcdefghij"),
        )
    
        pdf2l = pd.DataFrame(
            {
                "a": list("abcdeabcde"),
                "b": list("abcabcabca"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf2r = pd.DataFrame(
            {
                "d": list("edcbaedcba"),
                "e": list("aaabbbcccd"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("fghijklmno"),
        )
    
        pdf3l = pd.DataFrame(
            {
                "a": list("aaaaaaaaaa"),
                "b": list("aaaaaaaaaa"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf3r = pd.DataFrame(
            {
                "d": list("aaabbbccaa"),
                "e": list("abbbbbbbbb"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("ABCDEFGHIJ"),
        )
    
        for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
            for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
                ddl = dd.from_pandas(pdl, lpart)
                ddr = dd.from_pandas(pdr, rpart)
    
                with dask.config.set({"dataframe.shuffle.method": "p2p"}):
                    expected = pdl.join(pdr, how=how)
                    assert_eq(
                        await c.compute(ddl.join(ddr, how=how)),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not (PANDAS_GE_200 and expected.index.empty),
                    )
    
                    expected = pdr.join(pdl, how=how)
                    assert_eq(
                        await c.compute(ddr.join(ddl, how=how)),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not (PANDAS_GE_200 and expected.index.empty),
                    )
    
                    expected = pd.merge(
                        pdl, pdr, how=how, left_index=True, right_index=True
                    )
                    assert_eq(
                        await c.compute(
                            dd.merge(
                                ddl,
                                ddr,
                                how=how,
                                left_index=True,
                                right_index=True,
                            )
                        ),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not (PANDAS_GE_200 and expected.index.empty),
                    )
    
                    expected = pd.merge(
                        pdr, pdl, how=how, left_index=True, right_index=True
                    )
                    assert_eq(
                        await c.compute(
                            dd.merge(
                                ddr,
                                ddl,
                                how=how,
                                left_index=True,
                                right_index=True,
                            )
                        ),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not (PANDAS_GE_200 and expected.index.empty),
                    )
    
                    # hash join
                    list_eq(
                        await c.compute(
>                           dd.merge(
                                ddl,
                                ddr,
                                how=how,
                                left_on="a",
                                right_on="d",
                            )
                        ),
                        pd.merge(pdl, pdr, how=how, left_on="a", right_on="d"),
                    )

distributed\shuffle\tests\test_merge.py:396: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
    return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
    return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
    disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None

    def get(
        key: str,
        default: Any = no_default,
        config: dict = config,
        override_with: Any = None,
    ) -> Any:
        """
        Get elements from global config
    
        If ``override_with`` is not None this value will be passed straight back.
        Useful for getting kwarg defaults from Dask config.
    
        Use '.' for nested access
    
        Examples
        --------
        >>> from dask import config
        >>> config.get('foo')  # doctest: +SKIP
        {'x': 1, 'y': 2}
    
        >>> config.get('foo.x')  # doctest: +SKIP
        1
    
        >>> config.get('foo.x.y', default=123)  # doctest: +SKIP
        123
    
        >>> config.get('foo.y', override_with=None)  # doctest: +SKIP
        2
    
        >>> config.get('foo.y', override_with=3)  # doctest: +SKIP
        3
    
        See Also
        --------
        dask.config.set
        """
        if override_with is not None:
            return override_with
        keys = key.split(".")
        result = config
        for k in keys:
            k = canonical_name(k, result)
            try:
>               result = result[k]
E               KeyError: 'disk'

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 11 runs failed: test_index_merge_p2p[inner] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53666', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:53667', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:53670', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'inner'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_index_merge_p2p(c, s, a, b, how):
        pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
        pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
    
        left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
        right = dd.from_pandas(pdf_right, npartitions=6)
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            assert_eq(
>               await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
                pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
            )

distributed\shuffle\tests\test_merge.py:471: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\core.py:6010: in merge
    return merge(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
    return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
    return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
    disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None

    def get(
        key: str,
        default: Any = no_default,
        config: dict = config,
        override_with: Any = None,
    ) -> Any:
        """
        Get elements from global config
    
        If ``override_with`` is not None this value will be passed straight back.
        Useful for getting kwarg defaults from Dask config.
    
        Use '.' for nested access
    
        Examples
        --------
        >>> from dask import config
        >>> config.get('foo')  # doctest: +SKIP
        {'x': 1, 'y': 2}
    
        >>> config.get('foo.x')  # doctest: +SKIP
        1
    
        >>> config.get('foo.x.y', default=123)  # doctest: +SKIP
        123
    
        >>> config.get('foo.y', override_with=None)  # doctest: +SKIP
        2
    
        >>> config.get('foo.y', override_with=3)  # doctest: +SKIP
        3
    
        See Also
        --------
        dask.config.set
        """
        if override_with is not None:
            return override_with
        keys = key.split(".")
        result = config
        for k in keys:
            k = canonical_name(k, result)
            try:
>               result = result[k]
E               KeyError: 'disk'

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 11 runs failed: test_index_merge_p2p[left] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53678', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:53679', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:53682', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'left'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_index_merge_p2p(c, s, a, b, how):
        pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
        pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
    
        left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
        right = dd.from_pandas(pdf_right, npartitions=6)
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            assert_eq(
>               await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
                pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
            )

distributed\shuffle\tests\test_merge.py:471: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\core.py:6010: in merge
    return merge(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
    return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
    return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
    disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None

    def get(
        key: str,
        default: Any = no_default,
        config: dict = config,
        override_with: Any = None,
    ) -> Any:
        """
        Get elements from global config
    
        If ``override_with`` is not None this value will be passed straight back.
        Useful for getting kwarg defaults from Dask config.
    
        Use '.' for nested access
    
        Examples
        --------
        >>> from dask import config
        >>> config.get('foo')  # doctest: +SKIP
        {'x': 1, 'y': 2}
    
        >>> config.get('foo.x')  # doctest: +SKIP
        1
    
        >>> config.get('foo.x.y', default=123)  # doctest: +SKIP
        123
    
        >>> config.get('foo.y', override_with=None)  # doctest: +SKIP
        2
    
        >>> config.get('foo.y', override_with=3)  # doctest: +SKIP
        3
    
        See Also
        --------
        dask.config.set
        """
        if override_with is not None:
            return override_with
        keys = key.split(".")
        result = config
        for k in keys:
            k = canonical_name(k, result)
            try:
>               result = result[k]
E               KeyError: 'disk'

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 11 runs failed: test_index_merge_p2p[right] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53691', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:53692', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:53695', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'right'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_index_merge_p2p(c, s, a, b, how):
        pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
        pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
    
        left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
        right = dd.from_pandas(pdf_right, npartitions=6)
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            assert_eq(
>               await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
                pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
            )

distributed\shuffle\tests\test_merge.py:471: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\core.py:6010: in merge
    return merge(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
    return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
    return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
    disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None

    def get(
        key: str,
        default: Any = no_default,
        config: dict = config,
        override_with: Any = None,
    ) -> Any:
        """
        Get elements from global config
    
        If ``override_with`` is not None this value will be passed straight back.
        Useful for getting kwarg defaults from Dask config.
    
        Use '.' for nested access
    
        Examples
        --------
        >>> from dask import config
        >>> config.get('foo')  # doctest: +SKIP
        {'x': 1, 'y': 2}
    
        >>> config.get('foo.x')  # doctest: +SKIP
        1
    
        >>> config.get('foo.x.y', default=123)  # doctest: +SKIP
        123
    
        >>> config.get('foo.y', override_with=None)  # doctest: +SKIP
        2
    
        >>> config.get('foo.y', override_with=3)  # doctest: +SKIP
        3
    
        See Also
        --------
        dask.config.set
        """
        if override_with is not None:
            return override_with
        keys = key.split(".")
        result = config
        for k in keys:
            k = canonical_name(k, result)
            try:
>               result = result[k]
E               KeyError: 'disk'

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 11 runs failed: test_index_merge_p2p[outer] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53703', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:53704', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:53707', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'outer'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_index_merge_p2p(c, s, a, b, how):
        pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
        pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
    
        left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
        right = dd.from_pandas(pdf_right, npartitions=6)
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            assert_eq(
>               await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
                pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
            )

distributed\shuffle\tests\test_merge.py:471: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\core.py:6010: in merge
    return merge(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
    return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
    return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
    disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None

    def get(
        key: str,
        default: Any = no_default,
        config: dict = config,
        override_with: Any = None,
    ) -> Any:
        """
        Get elements from global config
    
        If ``override_with`` is not None this value will be passed straight back.
        Useful for getting kwarg defaults from Dask config.
    
        Use '.' for nested access
    
        Examples
        --------
        >>> from dask import config
        >>> config.get('foo')  # doctest: +SKIP
        {'x': 1, 'y': 2}
    
        >>> config.get('foo.x')  # doctest: +SKIP
        1
    
        >>> config.get('foo.x.y', default=123)  # doctest: +SKIP
        123
    
        >>> config.get('foo.y', override_with=None)  # doctest: +SKIP
        2
    
        >>> config.get('foo.y', override_with=3)  # doctest: +SKIP
        3
    
        See Also
        --------
        dask.config.set
        """
        if override_with is not None:
            return override_with
        keys = key.split(".")
        result = config
        for k in keys:
            k = canonical_name(k, result)
            try:
>               result = result[k]
E               KeyError: 'disk'

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 11 runs failed: test_merge_does_not_deadlock_if_worker_joins (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
KeyError: 'disk'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:53715', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:53716', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>

    @mock.patch(
        "distributed.shuffle._worker_plugin._ShuffleRunManager",
        LimitedGetOrCreateShuffleRunManager,
    )
    @gen_cluster(client=True, nthreads=[("", 1)])
    async def test_merge_does_not_deadlock_if_worker_joins(c, s, a):
        """Regression test for https://github.com/dask/distributed/issues/8411"""
        pdf1 = pd.DataFrame({"a": range(100), "b": range(0, 200, 2)})
        pdf2 = pd.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50})
        df1 = dd.from_pandas(pdf1, npartitions=10)
        df2 = dd.from_pandas(pdf2, npartitions=20)
    
        run_manager_A = a.plugins["shuffle"].shuffle_runs
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
>           joined = dd.merge(df1, df2, left_on="a", right_on="x")

distributed\shuffle\tests\test_merge.py:516: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:745: in merge
    return hash_join(
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\utils.py:259: in wrapper
    return func(*args, **kwargs)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\dataframe\multi.py:356: in hash_join
    return hash_join_p2p(
distributed\shuffle\_merge.py:110: in hash_join_p2p
    disk: bool = dask.config.get("distributed.p2p.disk")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

key = 'distributed.p2p.disk', default = <no_default>
config = {'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiproces...comm': {'compression': False, 'default-scheme': 'tcp', 'offload': '10MiB', 'require-encryption': None, ...}, ...}, ...}
override_with = None

    def get(
        key: str,
        default: Any = no_default,
        config: dict = config,
        override_with: Any = None,
    ) -> Any:
        """
        Get elements from global config
    
        If ``override_with`` is not None this value will be passed straight back.
        Useful for getting kwarg defaults from Dask config.
    
        Use '.' for nested access
    
        Examples
        --------
        >>> from dask import config
        >>> config.get('foo')  # doctest: +SKIP
        {'x': 1, 'y': 2}
    
        >>> config.get('foo.x')  # doctest: +SKIP
        1
    
        >>> config.get('foo.x.y', default=123)  # doctest: +SKIP
        123
    
        >>> config.get('foo.y', override_with=None)  # doctest: +SKIP
        2
    
        >>> config.get('foo.y', override_with=3)  # doctest: +SKIP
        3
    
        See Also
        --------
        dask.config.set
        """
        if override_with is not None:
            return override_with
        keys = key.split(".")
        result = config
        for k in keys:
            k = canonical_name(k, result)
            try:
>               result = result[k]
E               KeyError: 'disk'

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\site-packages\dask\config.py:568: KeyError

Check warning on line 0 in distributed.tests.test_client

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 13 runs failed: test_mixing_clients_same_scheduler (distributed.tests.test_client)

artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: inc-64456584d8a2e7176e2cd177efaa15f2
s = <Scheduler 'tcp://127.0.0.1:52715', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:52716', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:52719', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster()
    async def test_mixing_clients_same_scheduler(s, a, b):
        async with Client(s.address, asynchronous=True) as c1, Client(
            s.address, asynchronous=True
        ) as c2:
            future = c1.submit(inc, 1)
>           assert await c2.submit(inc, future) == 3

distributed\tests\test_client.py:6232: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Future: cancelled, key: inc-64456584d8a2e7176e2cd177efaa15f2>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: inc-64456584d8a2e7176e2cd177efaa15f2

distributed\client.py:336: CancelledError