Skip to content

Fix flaky test_flaky_connect_recover_with_retry (#8556) #2189

Fix flaky test_flaky_connect_recover_with_retry (#8556)

Fix flaky test_flaky_connect_recover_with_retry (#8556) #2189

GitHub Actions / Unit Test Results failed Mar 6, 2024 in 0s

1 errors, 121 fail, 110 skipped, 3 816 pass in 9h 57m 8s

    27 files      27 suites   9h 57m 8s ⏱️
 4 048 tests  3 816 ✅   110 💤   121 ❌ 1 🔥
50 829 runs  47 386 ✅ 2 296 💤 1 146 ❌ 1 🔥

Results for commit b1597b6.

Annotations

Check warning on line 0 in distributed.dashboard.tests.test_scheduler_bokeh

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 10 runs failed: test_shuffling (distributed.dashboard.tests.test_scheduler_bokeh)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 0s]
Raw output
UserWarning: dask_expr does not support the DataFrameIOFunction protocol for column projection. To enable column projection, please ensure that the signature of `func` includes a `columns=` keyword argument instead.
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:41813', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:35907', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:32885', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True, scheduler_kwargs={"dashboard": True})
    async def test_shuffling(c, s, a, b):
        pytest.importorskip("pyarrow")
        dd = pytest.importorskip("dask.dataframe")
        ss = Shuffling(s)
    
>       df = dask.datasets.timeseries(
            start="2000-01-01",
            end="2000-02-01",
            dtypes={"x": float, "y": float},
            freq="10 s",
        )

distributed/dashboard/tests/test_scheduler_bokeh.py:1342: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/datasets.py:63: in timeseries
    return make_timeseries(
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/dataframe/io/demo.py:434: in make_timeseries
    return from_map(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

func = <dask.dataframe.io.demo.MakeDataframePart object at 0x7fd799177040>
args = None
meta =                    x         y
timestamp                     
2000-01-01 -0.536104  0.533165
divisions = [Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-02 00:00:00'), Timestamp('2000-01-03 00:00:00'), Timestamp('2000-01-04 00:00:00'), Timestamp('2000-01-05 00:00:00'), Timestamp('2000-01-06 00:00:00'), ...]
label = 'make-timeseries', enforce_metadata = False
iterables = [[([Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-02 00:00:00')], 1338619382), ([Timestamp('2000-01-02 00:00:00...01-06 00:00:00')], 731757020), ([Timestamp('2000-01-06 00:00:00'), Timestamp('2000-01-07 00:00:00')], 791785803), ...]]
kwargs = {}
DataFrameIOFunction = <class 'dask.dataframe.io.utils.DataFrameIOFunction'>
FromMap = <class 'dask_expr.io.io.FromMap'>
FromMapProjectable = <class 'dask_expr.io.io.FromMapProjectable'>
lengths = {31}, i = 0

    def from_map(
        func,
        *iterables,
        args=None,
        meta=no_default,
        divisions=None,
        label=None,
        enforce_metadata=False,
        **kwargs,
    ):
        """Create a dask-expr collection from a custom function map
    
        NOTE: The underlying ``Expr`` object produced by this API
        will support column projection (via ``simplify``) if
        the ``func`` argument has "columns" in its signature.
    
        """
        from dask.dataframe.io.utils import DataFrameIOFunction
    
        from dask_expr.io import FromMap, FromMapProjectable
    
        if "token" in kwargs:
            # This option doesn't really make sense in dask-expr
            raise NotImplementedError("dask_expr does not support a token argument.")
    
        lengths = set()
        iterables = list(iterables)
        for i, iterable in enumerate(iterables):
            if not isinstance(iterable, Iterable):
                raise ValueError(
                    f"All elements of `iterables` must be Iterable, got {type(iterable)}"
                )
            try:
                lengths.add(len(iterable))
            except (AttributeError, TypeError):
                iterables[i] = list(iterable)
                lengths.add(len(iterables[i]))
        if len(lengths) == 0:
            raise ValueError("`from_map` requires at least one Iterable input")
        elif len(lengths) > 1:
            raise ValueError("All `iterables` must have the same length")
        if lengths == {0}:
            raise ValueError("All `iterables` must have a non-zero length")
    
        # Check if `func` supports column projection
        allow_projection = False
        columns_arg_required = False
        if param := inspect.signature(func).parameters.get("columns", None):
            allow_projection = True
            columns_arg_required = param.default is param.empty
            if meta is no_default and columns_arg_required:
                raise TypeError(
                    "Argument `func` of `from_map` has a required `columns` "
                    " parameter and not `meta` provided."
                    "Either provide `meta` yourself or make `columns` an optional argument."
                )
        elif isinstance(func, DataFrameIOFunction):
>           warnings.warn(
                "dask_expr does not support the DataFrameIOFunction "
                "protocol for column projection. To enable column "
                "projection, please ensure that the signature of `func` "
                "includes a `columns=` keyword argument instead."
            )
E           UserWarning: dask_expr does not support the DataFrameIOFunction protocol for column projection. To enable column projection, please ensure that the signature of `func` includes a `columns=` keyword argument instead.

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:4999: UserWarning

Check warning on line 0 in distributed.deploy.tests.test_subprocess

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 13 runs failed: test_scale_up_and_down (distributed.deploy.tests.test_subprocess)

artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 1s]
Raw output
RuntimeError: Cluster failed to start: Expecting value: line 1 column 1 (char 0)
self = SubprocessCluster(SubprocessCluster, '<Not Connected>', workers=0, threads=0, memory=0 B)

    async def _start(self):
        while self.status == Status.starting:
            await asyncio.sleep(0.01)
        if self.status == Status.running:
            return
        if self.status == Status.closed:
            raise ValueError("Cluster is closed")
    
        self._lock = asyncio.Lock()
        self.status = Status.starting
    
        if self.scheduler_spec is None:
            try:
                import distributed.dashboard  # noqa: F401
            except ImportError:
                pass
            else:
                options = {"dashboard": True}
            self.scheduler_spec = {"cls": Scheduler, "options": options}
    
        try:
            # Check if scheduler has already been created by a subclass
            if self.scheduler is None:
                cls = self.scheduler_spec["cls"]
                if isinstance(cls, str):
                    cls = import_term(cls)
                self.scheduler = cls(**self.scheduler_spec.get("options", {}))
>               self.scheduler = await self.scheduler

distributed/deploy/spec.py:325: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/deploy/spec.py:74: in _
    await self.start()
distributed/deploy/subprocess.py:40: in start
    await self._start()
distributed/deploy/subprocess.py:116: in _start
    identity = json.load(f)
../../../miniconda3/envs/dask-distributed/lib/python3.11/json/__init__.py:293: in load
    return loads(fp.read(),
../../../miniconda3/envs/dask-distributed/lib/python3.11/json/__init__.py:346: in loads
    return _default_decoder.decode(s)
../../../miniconda3/envs/dask-distributed/lib/python3.11/json/decoder.py:337: in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <json.decoder.JSONDecoder object at 0x7f6887576010>, s = '', idx = 0

    def raw_decode(self, s, idx=0):
        """Decode a JSON document from ``s`` (a ``str`` beginning with
        a JSON document) and return a 2-tuple of the Python
        representation and the index in ``s`` where the document ended.
    
        This can be used to decode a JSON document from a string that may
        have extraneous data at the end.
    
        """
        try:
            obj, end = self.scan_once(s, idx)
        except StopIteration as err:
>           raise JSONDecodeError("Expecting value", s, err.value) from None
E           json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

../../../miniconda3/envs/dask-distributed/lib/python3.11/json/decoder.py:355: JSONDecodeError

The above exception was the direct cause of the following exception:

    @pytest.mark.skipif(WINDOWS, reason="distributed#7434")
    @gen_test()
    async def test_scale_up_and_down():
>       async with SubprocessCluster(
            n_workers=0,
            silence_logs=False,
            dashboard_address=":0",
            asynchronous=True,
        ) as cluster:

distributed/deploy/tests/test_subprocess.py:50: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/deploy/spec.py:473: in __aenter__
    await self
distributed/deploy/spec.py:418: in _
    await self._start()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = SubprocessCluster(SubprocessCluster, '<Not Connected>', workers=0, threads=0, memory=0 B)

    async def _start(self):
        while self.status == Status.starting:
            await asyncio.sleep(0.01)
        if self.status == Status.running:
            return
        if self.status == Status.closed:
            raise ValueError("Cluster is closed")
    
        self._lock = asyncio.Lock()
        self.status = Status.starting
    
        if self.scheduler_spec is None:
            try:
                import distributed.dashboard  # noqa: F401
            except ImportError:
                pass
            else:
                options = {"dashboard": True}
            self.scheduler_spec = {"cls": Scheduler, "options": options}
    
        try:
            # Check if scheduler has already been created by a subclass
            if self.scheduler is None:
                cls = self.scheduler_spec["cls"]
                if isinstance(cls, str):
                    cls = import_term(cls)
                self.scheduler = cls(**self.scheduler_spec.get("options", {}))
                self.scheduler = await self.scheduler
            self.scheduler_comm = rpc(
                getattr(self.scheduler, "external_address", None)
                or self.scheduler.address,
                connection_args=self.security.get_connection_args("client"),
            )
            await super()._start()
        except Exception as e:  # pragma: no cover
            self.status = Status.failed
            await self._close()
>           raise RuntimeError(f"Cluster failed to start: {e}") from e
E           RuntimeError: Cluster failed to start: Expecting value: line 1 column 1 (char 0)

distributed/deploy/spec.py:335: RuntimeError

Check warning on line 0 in distributed.deploy.tests.test_subprocess

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 13 runs failed: test_subprocess_cluster_does_not_depend_on_logging (distributed.deploy.tests.test_subprocess)

artifacts/ubuntu-latest-mindeps-numpy-notci1/pytest.xml [took 3s]
Raw output
psutil.NoSuchProcess: psutil.NoSuchProcess process no longer exists (pid=4916)
@pytest.mark.skipif(WINDOWS, reason="distributed#7434")
    @pytest.mark.slow
    @gen_test()
    async def test_subprocess_cluster_does_not_depend_on_logging():
        with new_config_file(
            {"distributed": {"logging": {"distributed": logging.CRITICAL + 1}}}
        ):
            async with SubprocessCluster(
                asynchronous=True, dashboard_address=":0"
            ) as cluster, Client(cluster, asynchronous=True) as client:
                result = await client.submit(lambda x: x + 1, 10)
>               assert result == 11

distributed/deploy/tests/test_subprocess.py:88: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/deploy/cluster.py:549: in __aexit__
    await self._close()
distributed/deploy/spec.py:448: in _close
    await self._correct_state()
distributed/deploy/spec.py:359: in _correct_state_internal
    await asyncio.gather(*tasks)
distributed/deploy/subprocess.py:50: in close
    child.kill()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = psutil.Process(pid=4916, status='terminated', started='18:04:55')
args = (), kwargs = {}

    @functools.wraps(fun)
    def wrapper(self, *args, **kwargs):
        if not self.is_running():
>           raise NoSuchProcess(self.pid, self._name)
E           psutil.NoSuchProcess: psutil.NoSuchProcess process no longer exists (pid=4916)

../../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/psutil/__init__.py:288: NoSuchProcess

Check warning on line 0 in distributed.protocol.tests.test_highlevelgraph

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_dataframe_annotations (distributed.protocol.tests.test_highlevelgraph)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 2s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 2s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 2s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 2s]
Raw output
AssertionError: assert 0 == 5
 +  where 0 = <distributed.protocol.tests.test_highlevelgraph.ExampleAnnotationPlugin object at 0x7fd790dda7d0>.retry_matches
 +  and   5 = <dask_expr.expr.Series: expr=Shuffle(416dd28)['a'] + Shuffle(416dd28)['b']>.npartitions
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:42023', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:44421', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:43541', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_dataframe_annotations(c, s, a, b):
        retries = 5
        plugin = ExampleAnnotationPlugin(retries=retries)
        s.add_plugin(plugin)
    
        assert plugin in s.plugins.values()
    
        df = dd.from_pandas(
            pd.DataFrame(
                {"a": np.arange(10, dtype=int), "b": np.arange(10, 0, -1, dtype=float)}
            ),
            npartitions=5,
        )
        df = df.shuffle("a", max_branch=2)
        acol = df["a"]
        bcol = df["b"]
    
        with dask.annotate(retries=retries):
            df = acol + bcol
    
        with dask.config.set(optimization__fuse__active=False):
            rdf = await c.compute(df)
    
        assert rdf.dtypes == np.float64
        assert (rdf == 10.0).all()
    
        # There is an annotation match per partition (i.e. task)
>       assert plugin.retry_matches == df.npartitions
E       AssertionError: assert 0 == 5
E        +  where 0 = <distributed.protocol.tests.test_highlevelgraph.ExampleAnnotationPlugin object at 0x7fd790dda7d0>.retry_matches
E        +  and   5 = <dask_expr.expr.Series: expr=Shuffle(416dd28)['a'] + Shuffle(416dd28)['b']>.npartitions

distributed/protocol/tests/test_highlevelgraph.py:186: AssertionError

Check warning on line 0 in distributed.shuffle.tests.test_graph

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_basic (distributed.shuffle.tests.test_graph)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 4s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 2s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 3s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 3s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 5s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 3s]
Raw output
UserWarning: dask_expr does not support the DataFrameIOFunction protocol for column projection. To enable column projection, please ensure that the signature of `func` includes a `columns=` keyword argument instead.
client = <Client: 'tcp://127.0.0.1:45521' processes=2 threads=2, memory=31.21 GiB>

    def test_basic(client):
>       df = dd.demo.make_timeseries(freq="15D", partition_freq="30D")

distributed/shuffle/tests/test_graph.py:20: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/dataframe/io/demo.py:434: in make_timeseries
    return from_map(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

func = <dask.dataframe.io.demo.MakeDataframePart object at 0x7fd79072f8e0>
args = None
meta =               name    id         x         y
timestamp                                   
2000-01-01  Ursula  1020 -0.504541  0.377014
divisions = [Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-31 00:00:00'), Timestamp('2000-03-01 00:00:00'), Timestamp('2000-03-31 00:00:00'), Timestamp('2000-04-30 00:00:00'), Timestamp('2000-05-30 00:00:00'), ...]
label = 'make-timeseries', enforce_metadata = False
iterables = [[([Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-31 00:00:00')], 1082284871), ([Timestamp('2000-01-31 00:00:00...-30 00:00:00')], 1251514339), ([Timestamp('2000-05-30 00:00:00'), Timestamp('2000-06-29 00:00:00')], 1644314248), ...]]
kwargs = {}
DataFrameIOFunction = <class 'dask.dataframe.io.utils.DataFrameIOFunction'>
FromMap = <class 'dask_expr.io.io.FromMap'>
FromMapProjectable = <class 'dask_expr.io.io.FromMapProjectable'>
lengths = {12}, i = 0

    def from_map(
        func,
        *iterables,
        args=None,
        meta=no_default,
        divisions=None,
        label=None,
        enforce_metadata=False,
        **kwargs,
    ):
        """Create a dask-expr collection from a custom function map
    
        NOTE: The underlying ``Expr`` object produced by this API
        will support column projection (via ``simplify``) if
        the ``func`` argument has "columns" in its signature.
    
        """
        from dask.dataframe.io.utils import DataFrameIOFunction
    
        from dask_expr.io import FromMap, FromMapProjectable
    
        if "token" in kwargs:
            # This option doesn't really make sense in dask-expr
            raise NotImplementedError("dask_expr does not support a token argument.")
    
        lengths = set()
        iterables = list(iterables)
        for i, iterable in enumerate(iterables):
            if not isinstance(iterable, Iterable):
                raise ValueError(
                    f"All elements of `iterables` must be Iterable, got {type(iterable)}"
                )
            try:
                lengths.add(len(iterable))
            except (AttributeError, TypeError):
                iterables[i] = list(iterable)
                lengths.add(len(iterables[i]))
        if len(lengths) == 0:
            raise ValueError("`from_map` requires at least one Iterable input")
        elif len(lengths) > 1:
            raise ValueError("All `iterables` must have the same length")
        if lengths == {0}:
            raise ValueError("All `iterables` must have a non-zero length")
    
        # Check if `func` supports column projection
        allow_projection = False
        columns_arg_required = False
        if param := inspect.signature(func).parameters.get("columns", None):
            allow_projection = True
            columns_arg_required = param.default is param.empty
            if meta is no_default and columns_arg_required:
                raise TypeError(
                    "Argument `func` of `from_map` has a required `columns` "
                    " parameter and not `meta` provided."
                    "Either provide `meta` yourself or make `columns` an optional argument."
                )
        elif isinstance(func, DataFrameIOFunction):
>           warnings.warn(
                "dask_expr does not support the DataFrameIOFunction "
                "protocol for column projection. To enable column "
                "projection, please ensure that the signature of `func` "
                "includes a `columns=` keyword argument instead."
            )
E           UserWarning: dask_expr does not support the DataFrameIOFunction protocol for column projection. To enable column projection, please ensure that the signature of `func` includes a `columns=` keyword argument instead.

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:4999: UserWarning

Check warning on line 0 in distributed.shuffle.tests.test_graph

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_raise_on_complex_numbers[csingle] (distributed.shuffle.tests.test_graph)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 0s]
Raw output
Failed: DID NOT RAISE <class 'TypeError'>
dtype = 'csingle'

    @pytest.mark.parametrize("dtype", ["csingle", "cdouble", "clongdouble"])
    def test_raise_on_complex_numbers(dtype):
        df = dd.from_pandas(
            pd.DataFrame({"x": pd.array(range(10), dtype=dtype)}), npartitions=5
        )
>       with pytest.raises(
            TypeError, match=f"p2p does not support data of type '{df.x.dtype}'"
        ), dask.config.set({"dataframe.shuffle.method": "p2p"}):
E       Failed: DID NOT RAISE <class 'TypeError'>

distributed/shuffle/tests/test_graph.py:40: Failed

Check warning on line 0 in distributed.shuffle.tests.test_graph

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_raise_on_complex_numbers[cdouble] (distributed.shuffle.tests.test_graph)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 0s]
Raw output
Failed: DID NOT RAISE <class 'TypeError'>
dtype = 'cdouble'

    @pytest.mark.parametrize("dtype", ["csingle", "cdouble", "clongdouble"])
    def test_raise_on_complex_numbers(dtype):
        df = dd.from_pandas(
            pd.DataFrame({"x": pd.array(range(10), dtype=dtype)}), npartitions=5
        )
>       with pytest.raises(
            TypeError, match=f"p2p does not support data of type '{df.x.dtype}'"
        ), dask.config.set({"dataframe.shuffle.method": "p2p"}):
E       Failed: DID NOT RAISE <class 'TypeError'>

distributed/shuffle/tests/test_graph.py:40: Failed

Check warning on line 0 in distributed.shuffle.tests.test_graph

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_raise_on_complex_numbers[clongdouble] (distributed.shuffle.tests.test_graph)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 0s]
Raw output
Failed: DID NOT RAISE <class 'TypeError'>
dtype = 'clongdouble'

    @pytest.mark.parametrize("dtype", ["csingle", "cdouble", "clongdouble"])
    def test_raise_on_complex_numbers(dtype):
        df = dd.from_pandas(
            pd.DataFrame({"x": pd.array(range(10), dtype=dtype)}), npartitions=5
        )
>       with pytest.raises(
            TypeError, match=f"p2p does not support data of type '{df.x.dtype}'"
        ), dask.config.set({"dataframe.shuffle.method": "p2p"}):
E       Failed: DID NOT RAISE <class 'TypeError'>

distributed/shuffle/tests/test_graph.py:40: Failed

Check warning on line 0 in distributed.shuffle.tests.test_graph

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_raise_on_sparse_data (distributed.shuffle.tests.test_graph)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 0s]
Raw output
Failed: DID NOT RAISE <class 'TypeError'>
def test_raise_on_sparse_data():
        df = dd.from_pandas(
            pd.DataFrame({"x": pd.array(range(10), dtype="Sparse[float64]")}), npartitions=5
        )
>       with pytest.raises(
            TypeError, match="p2p does not support sparse data"
        ), dask.config.set({"dataframe.shuffle.method": "p2p"}):
E       Failed: DID NOT RAISE <class 'TypeError'>

distributed/shuffle/tests/test_graph.py:68: Failed

Check warning on line 0 in distributed.shuffle.tests.test_graph

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_raise_on_non_string_column_name (distributed.shuffle.tests.test_graph)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 0s]
Raw output
Failed: DID NOT RAISE <class 'TypeError'>
def test_raise_on_non_string_column_name():
        df = dd.from_pandas(pd.DataFrame({"a": range(10), 1: range(10)}), npartitions=5)
>       with pytest.raises(
            TypeError, match="p2p requires all column names to be str"
        ), dask.config.set({"dataframe.shuffle.method": "p2p"}):
E       Failed: DID NOT RAISE <class 'TypeError'>

distributed/shuffle/tests/test_graph.py:76: Failed

Check warning on line 0 in distributed.shuffle.tests.test_graph

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_basic_state (distributed.shuffle.tests.test_graph)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 0s]
Raw output
UserWarning: dask_expr does not support the DataFrameIOFunction protocol for column projection. To enable column projection, please ensure that the signature of `func` includes a `columns=` keyword argument instead.
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:36921', workers: 0, cores: 0, tasks: 0>
workers = (<Worker 'tcp://127.0.0.1:32971', name: 0, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>, <W... 0>, <Worker 'tcp://127.0.0.1:36343', name: 3, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>)

    @gen_cluster([("", 2)] * 4, client=True)
    async def test_basic_state(c, s, *workers):
>       df = dd.demo.make_timeseries(freq="15D", partition_freq="30D")

distributed/shuffle/tests/test_graph.py:90: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/dataframe/io/demo.py:434: in make_timeseries
    return from_map(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

func = <dask.dataframe.io.demo.MakeDataframePart object at 0x7fd7906b5780>
args = None
meta =              name   id         x         y
timestamp                                 
2000-01-01  Sarah  976 -0.605019  0.871453
divisions = [Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-31 00:00:00'), Timestamp('2000-03-01 00:00:00'), Timestamp('2000-03-31 00:00:00'), Timestamp('2000-04-30 00:00:00'), Timestamp('2000-05-30 00:00:00'), ...]
label = 'make-timeseries', enforce_metadata = False
iterables = [[([Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-31 00:00:00')], 1328139782), ([Timestamp('2000-01-31 00:00:00...-30 00:00:00')], 1174576846), ([Timestamp('2000-05-30 00:00:00'), Timestamp('2000-06-29 00:00:00')], 1705657007), ...]]
kwargs = {}
DataFrameIOFunction = <class 'dask.dataframe.io.utils.DataFrameIOFunction'>
FromMap = <class 'dask_expr.io.io.FromMap'>
FromMapProjectable = <class 'dask_expr.io.io.FromMapProjectable'>
lengths = {12}, i = 0

    def from_map(
        func,
        *iterables,
        args=None,
        meta=no_default,
        divisions=None,
        label=None,
        enforce_metadata=False,
        **kwargs,
    ):
        """Create a dask-expr collection from a custom function map
    
        NOTE: The underlying ``Expr`` object produced by this API
        will support column projection (via ``simplify``) if
        the ``func`` argument has "columns" in its signature.
    
        """
        from dask.dataframe.io.utils import DataFrameIOFunction
    
        from dask_expr.io import FromMap, FromMapProjectable
    
        if "token" in kwargs:
            # This option doesn't really make sense in dask-expr
            raise NotImplementedError("dask_expr does not support a token argument.")
    
        lengths = set()
        iterables = list(iterables)
        for i, iterable in enumerate(iterables):
            if not isinstance(iterable, Iterable):
                raise ValueError(
                    f"All elements of `iterables` must be Iterable, got {type(iterable)}"
                )
            try:
                lengths.add(len(iterable))
            except (AttributeError, TypeError):
                iterables[i] = list(iterable)
                lengths.add(len(iterables[i]))
        if len(lengths) == 0:
            raise ValueError("`from_map` requires at least one Iterable input")
        elif len(lengths) > 1:
            raise ValueError("All `iterables` must have the same length")
        if lengths == {0}:
            raise ValueError("All `iterables` must have a non-zero length")
    
        # Check if `func` supports column projection
        allow_projection = False
        columns_arg_required = False
        if param := inspect.signature(func).parameters.get("columns", None):
            allow_projection = True
            columns_arg_required = param.default is param.empty
            if meta is no_default and columns_arg_required:
                raise TypeError(
                    "Argument `func` of `from_map` has a required `columns` "
                    " parameter and not `meta` provided."
                    "Either provide `meta` yourself or make `columns` an optional argument."
                )
        elif isinstance(func, DataFrameIOFunction):
>           warnings.warn(
                "dask_expr does not support the DataFrameIOFunction "
                "protocol for column projection. To enable column "
                "projection, please ensure that the signature of `func` "
                "includes a `columns=` keyword argument instead."
            )
E           UserWarning: dask_expr does not support the DataFrameIOFunction protocol for column projection. To enable column projection, please ensure that the signature of `func` includes a `columns=` keyword argument instead.

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:4999: UserWarning

Check warning on line 0 in distributed.shuffle.tests.test_graph

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_multiple_linear (distributed.shuffle.tests.test_graph)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 4s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 2s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 3s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 3s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 5s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 3s]
Raw output
UserWarning: dask_expr does not support the DataFrameIOFunction protocol for column projection. To enable column projection, please ensure that the signature of `func` includes a `columns=` keyword argument instead.
client = <Client: 'tcp://127.0.0.1:32975' processes=2 threads=2, memory=31.21 GiB>

    def test_multiple_linear(client):
>       df = dd.demo.make_timeseries(freq="15D", partition_freq="30D")

distributed/shuffle/tests/test_graph.py:114: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/dataframe/io/demo.py:434: in make_timeseries
    return from_map(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

func = <dask.dataframe.io.demo.MakeDataframePart object at 0x7fd790ec8b80>
args = None
meta =                name   id         x         y
timestamp                                   
2000-01-01  Norbert  951 -0.124228 -0.332256
divisions = [Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-31 00:00:00'), Timestamp('2000-03-01 00:00:00'), Timestamp('2000-03-31 00:00:00'), Timestamp('2000-04-30 00:00:00'), Timestamp('2000-05-30 00:00:00'), ...]
label = 'make-timeseries', enforce_metadata = False
iterables = [[([Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-31 00:00:00')], 276392038), ([Timestamp('2000-01-31 00:00:00'...05-30 00:00:00')], 845862956), ([Timestamp('2000-05-30 00:00:00'), Timestamp('2000-06-29 00:00:00')], 247826290), ...]]
kwargs = {}
DataFrameIOFunction = <class 'dask.dataframe.io.utils.DataFrameIOFunction'>
FromMap = <class 'dask_expr.io.io.FromMap'>
FromMapProjectable = <class 'dask_expr.io.io.FromMapProjectable'>
lengths = {12}, i = 0

    def from_map(
        func,
        *iterables,
        args=None,
        meta=no_default,
        divisions=None,
        label=None,
        enforce_metadata=False,
        **kwargs,
    ):
        """Create a dask-expr collection from a custom function map
    
        NOTE: The underlying ``Expr`` object produced by this API
        will support column projection (via ``simplify``) if
        the ``func`` argument has "columns" in its signature.
    
        """
        from dask.dataframe.io.utils import DataFrameIOFunction
    
        from dask_expr.io import FromMap, FromMapProjectable
    
        if "token" in kwargs:
            # This option doesn't really make sense in dask-expr
            raise NotImplementedError("dask_expr does not support a token argument.")
    
        lengths = set()
        iterables = list(iterables)
        for i, iterable in enumerate(iterables):
            if not isinstance(iterable, Iterable):
                raise ValueError(
                    f"All elements of `iterables` must be Iterable, got {type(iterable)}"
                )
            try:
                lengths.add(len(iterable))
            except (AttributeError, TypeError):
                iterables[i] = list(iterable)
                lengths.add(len(iterables[i]))
        if len(lengths) == 0:
            raise ValueError("`from_map` requires at least one Iterable input")
        elif len(lengths) > 1:
            raise ValueError("All `iterables` must have the same length")
        if lengths == {0}:
            raise ValueError("All `iterables` must have a non-zero length")
    
        # Check if `func` supports column projection
        allow_projection = False
        columns_arg_required = False
        if param := inspect.signature(func).parameters.get("columns", None):
            allow_projection = True
            columns_arg_required = param.default is param.empty
            if meta is no_default and columns_arg_required:
                raise TypeError(
                    "Argument `func` of `from_map` has a required `columns` "
                    " parameter and not `meta` provided."
                    "Either provide `meta` yourself or make `columns` an optional argument."
                )
        elif isinstance(func, DataFrameIOFunction):
>           warnings.warn(
                "dask_expr does not support the DataFrameIOFunction "
                "protocol for column projection. To enable column "
                "projection, please ensure that the signature of `func` "
                "includes a `columns=` keyword argument instead."
            )
E           UserWarning: dask_expr does not support the DataFrameIOFunction protocol for column projection. To enable column projection, please ensure that the signature of `func` includes a `columns=` keyword argument instead.

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:4999: UserWarning

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_merge_unknown_to_unknown[idx-inner] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 0s]
Raw output
AttributeError: 'dict' object has no attribute 'layers'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:34133', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:35319', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:39487', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
ddf_right_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
on = 'idx', how = 'inner'

    @gen_cluster(client=True)
    async def test_merge_unknown_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left_unknown,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Merge unknown to unknown
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left_unknown.merge(ddf_right_unknown, on=on, how=how)
        if not any(
            isinstance(layer, (HashJoinP2PLayer, P2PShuffleLayer))
>           for layer in result_graph.dask.layers.values()
        ):
E       AttributeError: 'dict' object has no attribute 'layers'

distributed/shuffle/tests/test_merge_column_and_index.py:183: AttributeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_merge_unknown_to_unknown[idx-left] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 0s]
Raw output
AttributeError: 'dict' object has no attribute 'layers'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:40307', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:39079', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:46561', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
ddf_right_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
on = 'idx', how = 'left'

    @gen_cluster(client=True)
    async def test_merge_unknown_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left_unknown,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Merge unknown to unknown
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left_unknown.merge(ddf_right_unknown, on=on, how=how)
        if not any(
            isinstance(layer, (HashJoinP2PLayer, P2PShuffleLayer))
>           for layer in result_graph.dask.layers.values()
        ):
E       AttributeError: 'dict' object has no attribute 'layers'

distributed/shuffle/tests/test_merge_column_and_index.py:183: AttributeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_merge_unknown_to_unknown[idx-right] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 0s]
Raw output
AttributeError: 'dict' object has no attribute 'layers'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:41925', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:41271', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:41589', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
ddf_right_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
on = 'idx', how = 'right'

    @gen_cluster(client=True)
    async def test_merge_unknown_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left_unknown,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Merge unknown to unknown
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left_unknown.merge(ddf_right_unknown, on=on, how=how)
        if not any(
            isinstance(layer, (HashJoinP2PLayer, P2PShuffleLayer))
>           for layer in result_graph.dask.layers.values()
        ):
E       AttributeError: 'dict' object has no attribute 'layers'

distributed/shuffle/tests/test_merge_column_and_index.py:183: AttributeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_merge_unknown_to_unknown[idx-outer] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 0s]
Raw output
AttributeError: 'dict' object has no attribute 'layers'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:33763', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:37407', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:38709', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
ddf_right_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
on = 'idx', how = 'outer'

    @gen_cluster(client=True)
    async def test_merge_unknown_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left_unknown,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Merge unknown to unknown
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left_unknown.merge(ddf_right_unknown, on=on, how=how)
        if not any(
            isinstance(layer, (HashJoinP2PLayer, P2PShuffleLayer))
>           for layer in result_graph.dask.layers.values()
        ):
E       AttributeError: 'dict' object has no attribute 'layers'

distributed/shuffle/tests/test_merge_column_and_index.py:183: AttributeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_merge_unknown_to_unknown[on1-inner] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 0s]
Raw output
AttributeError: 'dict' object has no attribute 'layers'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:39255', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:42849', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:34299', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
ddf_right_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
on = ['idx'], how = 'inner'

    @gen_cluster(client=True)
    async def test_merge_unknown_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left_unknown,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Merge unknown to unknown
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left_unknown.merge(ddf_right_unknown, on=on, how=how)
        if not any(
            isinstance(layer, (HashJoinP2PLayer, P2PShuffleLayer))
>           for layer in result_graph.dask.layers.values()
        ):
E       AttributeError: 'dict' object has no attribute 'layers'

distributed/shuffle/tests/test_merge_column_and_index.py:183: AttributeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_merge_unknown_to_unknown[on1-left] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 0s]
Raw output
AttributeError: 'dict' object has no attribute 'layers'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:35873', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:39175', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:33097', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
ddf_right_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
on = ['idx'], how = 'left'

    @gen_cluster(client=True)
    async def test_merge_unknown_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left_unknown,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Merge unknown to unknown
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left_unknown.merge(ddf_right_unknown, on=on, how=how)
        if not any(
            isinstance(layer, (HashJoinP2PLayer, P2PShuffleLayer))
>           for layer in result_graph.dask.layers.values()
        ):
E       AttributeError: 'dict' object has no attribute 'layers'

distributed/shuffle/tests/test_merge_column_and_index.py:183: AttributeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_merge_unknown_to_unknown[on1-right] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 0s]
Raw output
AttributeError: 'dict' object has no attribute 'layers'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:36877', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:42109', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:45299', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
ddf_right_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
on = ['idx'], how = 'right'

    @gen_cluster(client=True)
    async def test_merge_unknown_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left_unknown,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Merge unknown to unknown
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left_unknown.merge(ddf_right_unknown, on=on, how=how)
        if not any(
            isinstance(layer, (HashJoinP2PLayer, P2PShuffleLayer))
>           for layer in result_graph.dask.layers.values()
        ):
E       AttributeError: 'dict' object has no attribute 'layers'

distributed/shuffle/tests/test_merge_column_and_index.py:183: AttributeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_merge_unknown_to_unknown[on1-outer] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 0s]
Raw output
AttributeError: 'dict' object has no attribute 'layers'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:35629', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:36469', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:45343', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
ddf_right_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
on = ['idx'], how = 'outer'

    @gen_cluster(client=True)
    async def test_merge_unknown_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left_unknown,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Merge unknown to unknown
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left_unknown.merge(ddf_right_unknown, on=on, how=how)
        if not any(
            isinstance(layer, (HashJoinP2PLayer, P2PShuffleLayer))
>           for layer in result_graph.dask.layers.values()
        ):
E       AttributeError: 'dict' object has no attribute 'layers'

distributed/shuffle/tests/test_merge_column_and_index.py:183: AttributeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_merge_unknown_to_unknown[on2-inner] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 0s]
Raw output
AttributeError: 'dict' object has no attribute 'layers'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:44839', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:45497', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:33041', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
ddf_right_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
on = ['idx', 'k'], how = 'inner'

    @gen_cluster(client=True)
    async def test_merge_unknown_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left_unknown,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Merge unknown to unknown
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left_unknown.merge(ddf_right_unknown, on=on, how=how)
        if not any(
            isinstance(layer, (HashJoinP2PLayer, P2PShuffleLayer))
>           for layer in result_graph.dask.layers.values()
        ):
E       AttributeError: 'dict' object has no attribute 'layers'

distributed/shuffle/tests/test_merge_column_and_index.py:183: AttributeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_merge_unknown_to_unknown[on2-left] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 0s]
Raw output
AttributeError: 'dict' object has no attribute 'layers'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:43729', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:33317', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:36101', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
ddf_right_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
on = ['idx', 'k'], how = 'left'

    @gen_cluster(client=True)
    async def test_merge_unknown_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left_unknown,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Merge unknown to unknown
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left_unknown.merge(ddf_right_unknown, on=on, how=how)
        if not any(
            isinstance(layer, (HashJoinP2PLayer, P2PShuffleLayer))
>           for layer in result_graph.dask.layers.values()
        ):
E       AttributeError: 'dict' object has no attribute 'layers'

distributed/shuffle/tests/test_merge_column_and_index.py:183: AttributeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_merge_unknown_to_unknown[on2-right] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 0s]
Raw output
AttributeError: 'dict' object has no attribute 'layers'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:34905', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:43165', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:41741', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
ddf_right_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
on = ['idx', 'k'], how = 'right'

    @gen_cluster(client=True)
    async def test_merge_unknown_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left_unknown,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Merge unknown to unknown
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left_unknown.merge(ddf_right_unknown, on=on, how=how)
        if not any(
            isinstance(layer, (HashJoinP2PLayer, P2PShuffleLayer))
>           for layer in result_graph.dask.layers.values()
        ):
E       AttributeError: 'dict' object has no attribute 'layers'

distributed/shuffle/tests/test_merge_column_and_index.py:183: AttributeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_merge_unknown_to_unknown[on2-outer] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 0s]
Raw output
AttributeError: 'dict' object has no attribute 'layers'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:40349', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:41105', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:46039', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
ddf_right_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
on = ['idx', 'k'], how = 'outer'

    @gen_cluster(client=True)
    async def test_merge_unknown_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left_unknown,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Merge unknown to unknown
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left_unknown.merge(ddf_right_unknown, on=on, how=how)
        if not any(
            isinstance(layer, (HashJoinP2PLayer, P2PShuffleLayer))
>           for layer in result_graph.dask.layers.values()
        ):
E       AttributeError: 'dict' object has no attribute 'layers'

distributed/shuffle/tests/test_merge_column_and_index.py:183: AttributeError

Check warning on line 0 in distributed.shuffle.tests.test_merge_column_and_index

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_merge_unknown_to_unknown[on3-inner] (distributed.shuffle.tests.test_merge_column_and_index)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 0s]
Raw output
AttributeError: 'dict' object has no attribute 'layers'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:40717', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:39885', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:45991', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
df_left =      k  v1
idx       
0    0   0
0    1   1
0    2   2
1    0   3
1    1   4
1    2   5
1    3   6
2    0   7
2    1  ...    0  37
9    1  38
9    2  39
9    3  40
9    4  41
9    5  42
9    6  43
10   0  44
10   1  45
10   2  46
10   3  47
df_right =      k  v1
idx       
0    0   0
0    1   1
0    2   2
0    3   3
1    0   4
1    1   5
2    0   6
2    1   7
2    2  ...    1  42
9    2  43
9    3  44
10   0  45
10   1  46
10   2  47
10   3  48
10   4  49
10   5  50
10   6  51
10   7  52
ddf_left_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
ddf_right_unknown = <dask_expr.expr.DataFrame: expr=ClearDivisions(frame=df)>
on = ['k', 'idx'], how = 'inner'

    @gen_cluster(client=True)
    async def test_merge_unknown_to_unknown(
        c,
        s,
        a,
        b,
        df_left,
        df_right,
        ddf_left_unknown,
        ddf_right_unknown,
        on,
        how,
    ):
        # Compute expected
        expected = df_left.merge(df_right, on=on, how=how)
    
        # Merge unknown to unknown
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            result_graph = ddf_left_unknown.merge(ddf_right_unknown, on=on, how=how)
        if not any(
            isinstance(layer, (HashJoinP2PLayer, P2PShuffleLayer))
>           for layer in result_graph.dask.layers.values()
        ):
E       AttributeError: 'dict' object has no attribute 'layers'

distributed/shuffle/tests/test_merge_column_and_index.py:183: AttributeError