Skip to content

Fix flaky test_flaky_connect_recover_with_retry (#8556) #2189

Fix flaky test_flaky_connect_recover_with_retry (#8556)

Fix flaky test_flaky_connect_recover_with_retry (#8556) #2189

GitHub Actions / Unit Test Results failed Mar 6, 2024 in 0s

117 fail, 110 skipped, 3 821 pass in 9h 50m 43s

    27 files      27 suites   9h 50m 43s ⏱️
 4 048 tests  3 821 ✅   110 💤   117 ❌
50 810 runs  47 367 ✅ 2 301 💤 1 142 ❌

Results for commit b1597b6.

Annotations

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_minimal_version (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
RuntimeError: P2P shuffling ffaba2702f1cd74baf9575a827c62460 failed during transfer phase
from __future__ import annotations
    
    import abc
    import asyncio
    import contextlib
    import itertools
    import pickle
    import time
    from collections.abc import (
        Callable,
        Coroutine,
        Generator,
        Hashable,
        Iterable,
        Iterator,
        Sequence,
    )
    from concurrent.futures import ThreadPoolExecutor
    from dataclasses import dataclass, field
    from enum import Enum
    from functools import partial
    from pathlib import Path
    from typing import TYPE_CHECKING, Any, Generic, NewType, TypeVar, cast
    
    from tornado.ioloop import IOLoop
    
    import dask.config
    from dask.core import flatten
    from dask.typing import Key
    from dask.utils import parse_timedelta
    
    from distributed.core import PooledRPCCall
    from distributed.exceptions import Reschedule
    from distributed.metrics import context_meter, thread_time
    from distributed.protocol import to_serialize
    from distributed.shuffle._comms import CommShardsBuffer
    from distributed.shuffle._disk import DiskShardsBuffer
    from distributed.shuffle._exceptions import ShuffleClosedError
    from distributed.shuffle._limiter import ResourceLimiter
    from distributed.shuffle._memory import MemoryShardsBuffer
    from distributed.utils import run_in_executor_with_context, sync
    from distributed.utils_comm import retry
    
    if TYPE_CHECKING:
        # TODO import from typing (requires Python >=3.10)
        from typing_extensions import ParamSpec, TypeAlias
    
        _P = ParamSpec("_P")
    
        # circular dependencies
        from distributed.shuffle._worker_plugin import ShuffleWorkerPlugin
    
    ShuffleId = NewType("ShuffleId", str)
    NDIndex: TypeAlias = tuple[int, ...]
    
    
    _T_partition_id = TypeVar("_T_partition_id")
    _T_partition_type = TypeVar("_T_partition_type")
    _T = TypeVar("_T")
    
    
    class ShuffleRun(Generic[_T_partition_id, _T_partition_type]):
        id: ShuffleId
        run_id: int
        span_id: str | None
        local_address: str
        executor: ThreadPoolExecutor
        rpc: Callable[[str], PooledRPCCall]
        digest_metric: Callable[[Hashable, float], None]
        scheduler: PooledRPCCall
        closed: bool
        _disk_buffer: DiskShardsBuffer | MemoryShardsBuffer
        _comm_buffer: CommShardsBuffer
        received: set[_T_partition_id]
        total_recvd: int
        start_time: float
        _exception: Exception | None
        _closed_event: asyncio.Event
        _loop: IOLoop
    
        RETRY_COUNT: int
        RETRY_DELAY_MIN: float
        RETRY_DELAY_MAX: float
    
        def __init__(
            self,
            id: ShuffleId,
            run_id: int,
            span_id: str | None,
            local_address: str,
            directory: str,
            executor: ThreadPoolExecutor,
            rpc: Callable[[str], PooledRPCCall],
            digest_metric: Callable[[Hashable, float], None],
            scheduler: PooledRPCCall,
            memory_limiter_disk: ResourceLimiter,
            memory_limiter_comms: ResourceLimiter,
            disk: bool,
            loop: IOLoop,
        ):
            self.id = id
            self.run_id = run_id
            self.span_id = span_id
            self.local_address = local_address
            self.executor = executor
            self.rpc = rpc
            self.digest_metric = digest_metric
            self.scheduler = scheduler
            self.closed = False
    
            # Initialize buffers and start background tasks
            # Don't log metrics issued by the background tasks onto the dask task that
            # spawned this object
            with context_meter.clear_callbacks():
                with self._capture_metrics("background-disk"):
                    if disk:
                        self._disk_buffer = DiskShardsBuffer(
                            directory=directory,
                            read=self.read,
                            memory_limiter=memory_limiter_disk,
                        )
                    else:
                        self._disk_buffer = MemoryShardsBuffer(deserialize=self.deserialize)
    
                with self._capture_metrics("background-comms"):
                    self._comm_buffer = CommShardsBuffer(
                        send=self.send, memory_limiter=memory_limiter_comms
                    )
    
            # TODO: reduce number of connections to number of workers
            # MultiComm.max_connections = min(10, n_workers)
    
            self.transferred = False
            self.received = set()
            self.total_recvd = 0
            self.start_time = time.time()
            self._exception = None
            self._closed_event = asyncio.Event()
            self._loop = loop
    
            self.RETRY_COUNT = dask.config.get("distributed.p2p.comm.retry.count")
            self.RETRY_DELAY_MIN = parse_timedelta(
                dask.config.get("distributed.p2p.comm.retry.delay.min"), default="s"
            )
            self.RETRY_DELAY_MAX = parse_timedelta(
                dask.config.get("distributed.p2p.comm.retry.delay.max"), default="s"
            )
    
        def __repr__(self) -> str:
            return f"<{self.__class__.__name__}: id={self.id!r}, run_id={self.run_id!r}, local_address={self.local_address!r}, closed={self.closed!r}, transferred={self.transferred!r}>"
    
        def __str__(self) -> str:
            return f"{self.__class__.__name__}<{self.id}[{self.run_id}]> on {self.local_address}"
    
        def __hash__(self) -> int:
            return self.run_id
    
        @contextlib.contextmanager
        def _capture_metrics(self, where: str) -> Iterator[None]:
            """Capture context_meter metrics as
    
                {('p2p', <span id>, 'foreground|background...', label, unit): value}
    
            **Note 1:** When the metric is not logged by a background task
            (where='foreground'), this produces a duplicated metric under
    
                {('execute', <span id>, <task prefix>, label, unit): value}
    
            This is by design so that one can have a holistic view of the whole shuffle
            process.
    
            **Note 2:** We're immediately writing to Worker.digests.
            We don't temporarily store metrics under ShuffleRun as we would lose those
            recorded between the heartbeat and when the ShuffleRun object is deleted at the
            end of a run.
            """
    
            def callback(label: Hashable, value: float, unit: str) -> None:
                if not isinstance(label, tuple):
                    label = (label,)
                if isinstance(label[0], str) and label[0].startswith("p2p-"):
                    label = (label[0][len("p2p-") :], *label[1:])
                name = ("p2p", self.span_id, where, *label, unit)
    
                self.digest_metric(name, value)
    
            with context_meter.add_callback(callback, allow_offload="background" in where):
                yield
    
        async def barrier(self, run_ids: Sequence[int]) -> int:
            self.raise_if_closed()
            consistent = all(run_id == self.run_id for run_id in run_ids)
            # TODO: Consider broadcast pinging once when the shuffle starts to warm
            # up the comm pool on scheduler side
            await self.scheduler.shuffle_barrier(
                id=self.id, run_id=self.run_id, consistent=consistent
            )
            return self.run_id
    
        async def _send(
            self, address: str, shards: list[tuple[_T_partition_id, Any]] | bytes
        ) -> None:
            self.raise_if_closed()
            return await self.rpc(address).shuffle_receive(
                data=to_serialize(shards),
                shuffle_id=self.id,
                run_id=self.run_id,
            )
    
        async def send(
            self, address: str, shards: list[tuple[_T_partition_id, Any]]
        ) -> None:
            if _mean_shard_size(shards) < 65536:
                # Don't send buffers individually over the tcp comms.
                # Instead, merge everything into an opaque bytes blob, send it all at once,
                # and unpickle it on the other side.
                # Performance tests informing the size threshold:
                # https://github.com/dask/distributed/pull/8318
                shards_or_bytes: list | bytes = pickle.dumps(shards)
            else:
                shards_or_bytes = shards
    
            def _send() -> Coroutine[Any, Any, None]:
                return self._send(address, shards_or_bytes)
    
            return await retry(
                _send,
                count=self.RETRY_COUNT,
                delay_min=self.RETRY_DELAY_MIN,
                delay_max=self.RETRY_DELAY_MAX,
            )
    
        async def offload(
            self, func: Callable[_P, _T], *args: _P.args, **kwargs: _P.kwargs
        ) -> _T:
            self.raise_if_closed()
            with context_meter.meter("offload"):
                return await run_in_executor_with_context(
                    self.executor, func, *args, **kwargs
                )
    
        def heartbeat(self) -> dict[str, Any]:
            comm_heartbeat = self._comm_buffer.heartbeat()
            comm_heartbeat["read"] = self.total_recvd
            return {
                "disk": self._disk_buffer.heartbeat(),
                "comm": comm_heartbeat,
                "start": self.start_time,
            }
    
        async def _write_to_comm(
            self, data: dict[str, tuple[_T_partition_id, Any]]
        ) -> None:
            self.raise_if_closed()
            await self._comm_buffer.write(data)
    
        async def _write_to_disk(self, data: dict[NDIndex, Any]) -> None:
            self.raise_if_closed()
            await self._disk_buffer.write(
                {"_".join(str(i) for i in k): v for k, v in data.items()}
            )
    
        def raise_if_closed(self) -> None:
            if self.closed:
                if self._exception:
                    raise self._exception
                raise ShuffleClosedError(f"{self} has already been closed")
    
        async def inputs_done(self) -> None:
            self.raise_if_closed()
            self.transferred = True
            await self._flush_comm()
            try:
                self._comm_buffer.raise_on_exception()
            except Exception as e:
                self._exception = e
                raise
    
        async def _flush_comm(self) -> None:
            self.raise_if_closed()
            await self._comm_buffer.flush()
    
        async def flush_receive(self) -> None:
            self.raise_if_closed()
            await self._disk_buffer.flush()
    
        async def close(self) -> None:
            if self.closed:  # pragma: no cover
                await self._closed_event.wait()
                return
    
            self.closed = True
            await self._comm_buffer.close()
            await self._disk_buffer.close()
            self._closed_event.set()
    
        def fail(self, exception: Exception) -> None:
            if not self.closed:
                self._exception = exception
    
        def _read_from_disk(self, id: NDIndex) -> list[Any]:  # TODO: Typing
            self.raise_if_closed()
            return self._disk_buffer.read("_".join(str(i) for i in id))
    
        async def receive(self, data: list[tuple[_T_partition_id, Any]] | bytes) -> None:
            if isinstance(data, bytes):
                # Unpack opaque blob. See send()
                data = cast(list[tuple[_T_partition_id, Any]], pickle.loads(data))
            await self._receive(data)
    
        async def _ensure_output_worker(self, i: _T_partition_id, key: Key) -> None:
            assigned_worker = self._get_assigned_worker(i)
    
            if assigned_worker != self.local_address:
                result = await self.scheduler.shuffle_restrict_task(
                    id=self.id, run_id=self.run_id, key=key, worker=assigned_worker
                )
                if result["status"] == "error":
                    raise RuntimeError(result["message"])
                assert result["status"] == "OK"
                raise Reschedule()
    
        @abc.abstractmethod
        def _get_assigned_worker(self, i: _T_partition_id) -> str:
            """Get the address of the worker assigned to the output partition"""
    
        @abc.abstractmethod
        async def _receive(self, data: list[tuple[_T_partition_id, Any]]) -> None:
            """Receive shards belonging to output partitions of this shuffle run"""
    
        def add_partition(
            self, data: _T_partition_type, partition_id: _T_partition_id
        ) -> int:
            self.raise_if_closed()
            if self.transferred:
                raise RuntimeError(f"Cannot add more partitions to {self}")
            # Log metrics both in the "execute" and in the "p2p" contexts
            with self._capture_metrics("foreground"):
                with (
                    context_meter.meter("p2p-shard-partition-noncpu"),
                    context_meter.meter("p2p-shard-partition-cpu", func=thread_time),
                ):
                    shards = self._shard_partition(data, partition_id)
                sync(self._loop, self._write_to_comm, shards)
            return self.run_id
    
        @abc.abstractmethod
        def _shard_partition(
            self, data: _T_partition_type, partition_id: _T_partition_id
        ) -> dict[str, tuple[_T_partition_id, Any]]:
            """Shard an input partition by the assigned output workers"""
    
        def get_output_partition(
            self, partition_id: _T_partition_id, key: Key, **kwargs: Any
        ) -> _T_partition_type:
            self.raise_if_closed()
            sync(self._loop, self._ensure_output_worker, partition_id, key)
            if not self.transferred:
                raise RuntimeError("`get_output_partition` called before barrier task")
            sync(self._loop, self.flush_receive)
            with (
                # Log metrics both in the "execute" and in the "p2p" contexts
                self._capture_metrics("foreground"),
                context_meter.meter("p2p-get-output-noncpu"),
                context_meter.meter("p2p-get-output-cpu", func=thread_time),
            ):
                return self._get_output_partition(partition_id, key, **kwargs)
    
        @abc.abstractmethod
        def _get_output_partition(
            self, partition_id: _T_partition_id, key: Key, **kwargs: Any
        ) -> _T_partition_type:
            """Get an output partition to the shuffle run"""
    
        @abc.abstractmethod
        def read(self, path: Path) -> tuple[Any, int]:
            """Read shards from disk"""
    
        @abc.abstractmethod
        def deserialize(self, buffer: Any) -> Any:
            """Deserialize shards"""
    
    
    def get_worker_plugin() -> ShuffleWorkerPlugin:
        from distributed import get_worker
    
        try:
            worker = get_worker()
        except ValueError as e:
            raise RuntimeError(
                "`shuffle='p2p'` requires Dask's distributed scheduler. This task is not running on a Worker; "
                "please confirm that you've created a distributed Client and are submitting this computation through it."
            ) from e
        try:
            return worker.plugins["shuffle"]  # type: ignore
        except KeyError as e:
            raise RuntimeError(
                f"The worker {worker.address} does not have a P2P shuffle plugin."
            ) from e
    
    
    _BARRIER_PREFIX = "shuffle-barrier-"
    
    
    def barrier_key(shuffle_id: ShuffleId) -> str:
        return _BARRIER_PREFIX + shuffle_id
    
    
    def id_from_key(key: Key) -> ShuffleId | None:
        if not isinstance(key, str) or not key.startswith(_BARRIER_PREFIX):
            return None
        return ShuffleId(key[len(_BARRIER_PREFIX) :])
    
    
    class ShuffleType(Enum):
        DATAFRAME = "DataFrameShuffle"
        ARRAY_RECHUNK = "ArrayRechunk"
    
    
    @dataclass(frozen=True)
    class ShuffleRunSpec(Generic[_T_partition_id]):
        run_id: int = field(init=False, default_factory=partial(next, itertools.count(1)))
        spec: ShuffleSpec
        worker_for: dict[_T_partition_id, str]
        span_id: str | None
    
        @property
        def id(self) -> ShuffleId:
            return self.spec.id
    
    
    @dataclass(frozen=True)
    class ShuffleSpec(abc.ABC, Generic[_T_partition_id]):
        id: ShuffleId
        disk: bool
    
        @property
        @abc.abstractmethod
        def output_partitions(self) -> Generator[_T_partition_id, None, None]:
            """Output partitions"""
    
        @abc.abstractmethod
        def pick_worker(self, partition: _T_partition_id, workers: Sequence[str]) -> str:
            """Pick a worker for a partition"""
    
        def create_new_run(
            self,
            worker_for: dict[_T_partition_id, str],
            span_id: str | None,
        ) -> SchedulerShuffleState:
            return SchedulerShuffleState(
                run_spec=ShuffleRunSpec(spec=self, worker_for=worker_for, span_id=span_id),
                participating_workers=set(worker_for.values()),
            )
    
        def validate_data(self, data: Any) -> None:
            """Validate payload data before shuffling"""
    
        @abc.abstractmethod
        def create_run_on_worker(
            self,
            run_id: int,
            span_id: str | None,
            worker_for: dict[_T_partition_id, str],
            plugin: ShuffleWorkerPlugin,
        ) -> ShuffleRun:
            """Create the new shuffle run on the worker."""
    
    
    @dataclass(eq=False)
    class SchedulerShuffleState(Generic[_T_partition_id]):
        run_spec: ShuffleRunSpec
        participating_workers: set[str]
        _archived_by: str | None = field(default=None, init=False)
    
        @property
        def id(self) -> ShuffleId:
            return self.run_spec.id
    
        @property
        def run_id(self) -> int:
            return self.run_spec.run_id
    
        def __str__(self) -> str:
            return f"{self.__class__.__name__}<{self.id}[{self.run_id}]>"
    
        def __hash__(self) -> int:
            return hash(self.run_id)
    
    
    @contextlib.contextmanager
    def handle_transfer_errors(id: ShuffleId) -> Iterator[None]:
        try:
>           yield

distributed/shuffle/_core.py:494: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_shuffle.py:79: in shuffle_transfer
    return get_worker_plugin().add_partition(
distributed/shuffle/_worker_plugin.py:346: in add_partition
    return shuffle_run.add_partition(
distributed/shuffle/_core.py:343: in add_partition
    shards = self._shard_partition(data, partition_id)
distributed/shuffle/_shuffle.py:521: in _shard_partition
    out = split_by_worker(
distributed/shuffle/_shuffle.py:339: in split_by_worker
    t = to_pyarrow_table_dispatch(df, preserve_index=True)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/utils.py:773: in __call__
    return meth(arg, *args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    from __future__ import annotations
    
    import warnings
    from collections.abc import Iterable
    
    import numpy as np
    import pandas as pd
    from pandas.api.types import is_scalar, union_categoricals
    
    from dask.array.core import Array
    from dask.array.dispatch import percentile_lookup
    from dask.array.percentile import _percentile
    from dask.backends import CreationDispatch, DaskBackendEntrypoint
    from dask.dataframe._compat import PANDAS_GE_220, is_any_real_numeric_dtype
    from dask.dataframe.core import DataFrame, Index, Scalar, Series, _Frame
    from dask.dataframe.dispatch import (
        categorical_dtype_dispatch,
        concat,
        concat_dispatch,
        from_pyarrow_table_dispatch,
        get_parallel_type,
        group_split_dispatch,
        grouper_dispatch,
        hash_object_dispatch,
        is_categorical_dtype_dispatch,
        make_meta_dispatch,
        make_meta_obj,
        meta_lib_from_array,
        meta_nonempty,
        partd_encode_dispatch,
        pyarrow_schema_dispatch,
        to_pandas_dispatch,
        to_pyarrow_table_dispatch,
        tolist_dispatch,
        union_categoricals_dispatch,
    )
    from dask.dataframe.extensions import make_array_nonempty, make_scalar
    from dask.dataframe.utils import (
        _empty_series,
        _nonempty_scalar,
        _scalar_from_dtype,
        is_float_na_dtype,
        is_integer_na_dtype,
    )
    from dask.sizeof import SimpleSizeof, sizeof
    from dask.utils import is_arraylike, is_series_like, typename
    
    
    class DataFrameBackendEntrypoint(DaskBackendEntrypoint):
        """Dask-DataFrame version of ``DaskBackendEntrypoint``
    
        See Also
        --------
        PandasBackendEntrypoint
        """
    
        @staticmethod
        def from_dict(data: dict, *, npartitions: int, **kwargs):
            """Create a DataFrame collection from a dictionary
    
            Parameters
            ----------
            data : dict
                Of the form {field : array-like} or {field : dict}.
            npartitions : int
                The desired number of output partitions.
            **kwargs :
                Optional backend kwargs.
    
            See Also
            --------
            dask.dataframe.io.io.from_dict
            """
            raise NotImplementedError
    
        @staticmethod
        def read_parquet(path: str | list, **kwargs):
            """Read Parquet files into a DataFrame collection
    
            Parameters
            ----------
            path : str or list
                Source path(s).
            **kwargs :
                Optional backend kwargs.
    
            See Also
            --------
            dask.dataframe.io.parquet.core.read_parquet
            """
            raise NotImplementedError
    
        @staticmethod
        def read_json(url_path: str | list, **kwargs):
            """Read json files into a DataFrame collection
    
            Parameters
            ----------
            url_path : str or list
                Source path(s).
            **kwargs :
                Optional backend kwargs.
    
            See Also
            --------
            dask.dataframe.io.json.read_json
            """
            raise NotImplementedError
    
        @staticmethod
        def read_orc(path: str | list, **kwargs):
            """Read ORC files into a DataFrame collection
    
            Parameters
            ----------
            path : str or list
                Source path(s).
            **kwargs :
                Optional backend kwargs.
    
            See Also
            --------
            dask.dataframe.io.orc.core.read_orc
            """
            raise NotImplementedError
    
        @staticmethod
        def read_csv(urlpath: str | list, **kwargs):
            """Read CSV files into a DataFrame collection
    
            Parameters
            ----------
            urlpath : str or list
                Source path(s).
            **kwargs :
                Optional backend kwargs.
    
            See Also
            --------
            dask.dataframe.io.csv.read_csv
            """
            raise NotImplementedError
    
        @staticmethod
        def read_hdf(pattern: str | list, key: str, **kwargs):
            """Read HDF5 files into a DataFrame collection
    
            Parameters
            ----------
            pattern : str or list
                Source path(s).
            key : str
                Group identifier in the store.
            **kwargs :
                Optional backend kwargs.
    
            See Also
            --------
            dask.dataframe.io.hdf.read_hdf
            """
            raise NotImplementedError
    
    
    dataframe_creation_dispatch = CreationDispatch(
        module_name="dataframe",
        default="pandas",
        entrypoint_class=DataFrameBackendEntrypoint,
        name="dataframe_creation_dispatch",
    )
    
    
    ##########
    # Pandas #
    ##########
    
    
    @make_scalar.register(np.dtype)
    def _(dtype):
        return _scalar_from_dtype(dtype)
    
    
    @make_scalar.register(pd.Timestamp)
    @make_scalar.register(pd.Timedelta)
    @make_scalar.register(pd.Period)
    @make_scalar.register(pd.Interval)
    def _(x):
        return x
    
    
    @make_meta_dispatch.register((pd.Series, pd.DataFrame))
    def _(x, index=None):
        out = x.iloc[:0].copy(deep=True)
        # index isn't copied by default in pandas, even if deep=true
        out.index = out.index.copy(deep=True)
        return out
    
    
    @make_meta_dispatch.register(pd.Index)
    def _(x, index=None):
        return x[0:0].copy(deep=True)
    
    
    meta_object_types: tuple[type, ...] = (pd.Series, pd.DataFrame, pd.Index, pd.MultiIndex)
    try:
        import scipy.sparse as sp
    
        meta_object_types += (sp.spmatrix,)
    except ImportError:
        pass
    
    
    @pyarrow_schema_dispatch.register((pd.DataFrame,))
    def get_pyarrow_schema_pandas(obj, preserve_index=None):
        import pyarrow as pa
    
        return pa.Schema.from_pandas(obj, preserve_index=preserve_index)
    
    
    @to_pyarrow_table_dispatch.register((pd.DataFrame,))
    def get_pyarrow_table_from_pandas(obj, **kwargs):
        # `kwargs` must be supported by `pyarrow.Table.to_pandas`
>       import pyarrow as pa
E       ModuleNotFoundError: import of pyarrow halted; None in sys.modules

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/dataframe/backends.py:222: ModuleNotFoundError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:41183', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df

    @gen_cluster(client=True)
    async def test_minimal_version(c, s, a, b):
        no_pyarrow_ctx = (
            mock.patch.dict("sys.modules", {"pyarrow": None})
            if pa is not None
            else contextlib.nullcontext()
        )
        with no_pyarrow_ctx:
            A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
            a = dd.repartition(A, [0, 4, 5])
    
            B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
            b = dd.repartition(B, [0, 2, 5])
    
            with pytest.raises(
                ModuleNotFoundError, match="requires pyarrow"
            ), dask.config.set({"dataframe.shuffle.method": "p2p"}):
>               await c.compute(dd.merge(a, b, left_on="x", right_on="z"))

distributed/shuffle/tests/test_merge.py:72: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/client.py:331: in _result
    raise exc.with_traceback(tb)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_merge.py:781: in assign_index_merge_transfer
    return merge_transfer(
distributed/shuffle/_merge.py:150: in merge_transfer
    return shuffle_transfer(
distributed/shuffle/_shuffle.py:78: in shuffle_transfer
    with handle_transfer_errors(id):
../../../miniconda3/envs/dask-distributed/lib/python3.10/contextlib.py:153: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    from __future__ import annotations
    
    import abc
    import asyncio
    import contextlib
    import itertools
    import pickle
    import time
    from collections.abc import (
        Callable,
        Coroutine,
        Generator,
        Hashable,
        Iterable,
        Iterator,
        Sequence,
    )
    from concurrent.futures import ThreadPoolExecutor
    from dataclasses import dataclass, field
    from enum import Enum
    from functools import partial
    from pathlib import Path
    from typing import TYPE_CHECKING, Any, Generic, NewType, TypeVar, cast
    
    from tornado.ioloop import IOLoop
    
    import dask.config
    from dask.core import flatten
    from dask.typing import Key
    from dask.utils import parse_timedelta
    
    from distributed.core import PooledRPCCall
    from distributed.exceptions import Reschedule
    from distributed.metrics import context_meter, thread_time
    from distributed.protocol import to_serialize
    from distributed.shuffle._comms import CommShardsBuffer
    from distributed.shuffle._disk import DiskShardsBuffer
    from distributed.shuffle._exceptions import ShuffleClosedError
    from distributed.shuffle._limiter import ResourceLimiter
    from distributed.shuffle._memory import MemoryShardsBuffer
    from distributed.utils import run_in_executor_with_context, sync
    from distributed.utils_comm import retry
    
    if TYPE_CHECKING:
        # TODO import from typing (requires Python >=3.10)
        from typing_extensions import ParamSpec, TypeAlias
    
        _P = ParamSpec("_P")
    
        # circular dependencies
        from distributed.shuffle._worker_plugin import ShuffleWorkerPlugin
    
    ShuffleId = NewType("ShuffleId", str)
    NDIndex: TypeAlias = tuple[int, ...]
    
    
    _T_partition_id = TypeVar("_T_partition_id")
    _T_partition_type = TypeVar("_T_partition_type")
    _T = TypeVar("_T")
    
    
    class ShuffleRun(Generic[_T_partition_id, _T_partition_type]):
        id: ShuffleId
        run_id: int
        span_id: str | None
        local_address: str
        executor: ThreadPoolExecutor
        rpc: Callable[[str], PooledRPCCall]
        digest_metric: Callable[[Hashable, float], None]
        scheduler: PooledRPCCall
        closed: bool
        _disk_buffer: DiskShardsBuffer | MemoryShardsBuffer
        _comm_buffer: CommShardsBuffer
        received: set[_T_partition_id]
        total_recvd: int
        start_time: float
        _exception: Exception | None
        _closed_event: asyncio.Event
        _loop: IOLoop
    
        RETRY_COUNT: int
        RETRY_DELAY_MIN: float
        RETRY_DELAY_MAX: float
    
        def __init__(
            self,
            id: ShuffleId,
            run_id: int,
            span_id: str | None,
            local_address: str,
            directory: str,
            executor: ThreadPoolExecutor,
            rpc: Callable[[str], PooledRPCCall],
            digest_metric: Callable[[Hashable, float], None],
            scheduler: PooledRPCCall,
            memory_limiter_disk: ResourceLimiter,
            memory_limiter_comms: ResourceLimiter,
            disk: bool,
            loop: IOLoop,
        ):
            self.id = id
            self.run_id = run_id
            self.span_id = span_id
            self.local_address = local_address
            self.executor = executor
            self.rpc = rpc
            self.digest_metric = digest_metric
            self.scheduler = scheduler
            self.closed = False
    
            # Initialize buffers and start background tasks
            # Don't log metrics issued by the background tasks onto the dask task that
            # spawned this object
            with context_meter.clear_callbacks():
                with self._capture_metrics("background-disk"):
                    if disk:
                        self._disk_buffer = DiskShardsBuffer(
                            directory=directory,
                            read=self.read,
                            memory_limiter=memory_limiter_disk,
                        )
                    else:
                        self._disk_buffer = MemoryShardsBuffer(deserialize=self.deserialize)
    
                with self._capture_metrics("background-comms"):
                    self._comm_buffer = CommShardsBuffer(
                        send=self.send, memory_limiter=memory_limiter_comms
                    )
    
            # TODO: reduce number of connections to number of workers
            # MultiComm.max_connections = min(10, n_workers)
    
            self.transferred = False
            self.received = set()
            self.total_recvd = 0
            self.start_time = time.time()
            self._exception = None
            self._closed_event = asyncio.Event()
            self._loop = loop
    
            self.RETRY_COUNT = dask.config.get("distributed.p2p.comm.retry.count")
            self.RETRY_DELAY_MIN = parse_timedelta(
                dask.config.get("distributed.p2p.comm.retry.delay.min"), default="s"
            )
            self.RETRY_DELAY_MAX = parse_timedelta(
                dask.config.get("distributed.p2p.comm.retry.delay.max"), default="s"
            )
    
        def __repr__(self) -> str:
            return f"<{self.__class__.__name__}: id={self.id!r}, run_id={self.run_id!r}, local_address={self.local_address!r}, closed={self.closed!r}, transferred={self.transferred!r}>"
    
        def __str__(self) -> str:
            return f"{self.__class__.__name__}<{self.id}[{self.run_id}]> on {self.local_address}"
    
        def __hash__(self) -> int:
            return self.run_id
    
        @contextlib.contextmanager
        def _capture_metrics(self, where: str) -> Iterator[None]:
            """Capture context_meter metrics as
    
                {('p2p', <span id>, 'foreground|background...', label, unit): value}
    
            **Note 1:** When the metric is not logged by a background task
            (where='foreground'), this produces a duplicated metric under
    
                {('execute', <span id>, <task prefix>, label, unit): value}
    
            This is by design so that one can have a holistic view of the whole shuffle
            process.
    
            **Note 2:** We're immediately writing to Worker.digests.
            We don't temporarily store metrics under ShuffleRun as we would lose those
            recorded between the heartbeat and when the ShuffleRun object is deleted at the
            end of a run.
            """
    
            def callback(label: Hashable, value: float, unit: str) -> None:
                if not isinstance(label, tuple):
                    label = (label,)
                if isinstance(label[0], str) and label[0].startswith("p2p-"):
                    label = (label[0][len("p2p-") :], *label[1:])
                name = ("p2p", self.span_id, where, *label, unit)
    
                self.digest_metric(name, value)
    
            with context_meter.add_callback(callback, allow_offload="background" in where):
                yield
    
        async def barrier(self, run_ids: Sequence[int]) -> int:
            self.raise_if_closed()
            consistent = all(run_id == self.run_id for run_id in run_ids)
            # TODO: Consider broadcast pinging once when the shuffle starts to warm
            # up the comm pool on scheduler side
            await self.scheduler.shuffle_barrier(
                id=self.id, run_id=self.run_id, consistent=consistent
            )
            return self.run_id
    
        async def _send(
            self, address: str, shards: list[tuple[_T_partition_id, Any]] | bytes
        ) -> None:
            self.raise_if_closed()
            return await self.rpc(address).shuffle_receive(
                data=to_serialize(shards),
                shuffle_id=self.id,
                run_id=self.run_id,
            )
    
        async def send(
            self, address: str, shards: list[tuple[_T_partition_id, Any]]
        ) -> None:
            if _mean_shard_size(shards) < 65536:
                # Don't send buffers individually over the tcp comms.
                # Instead, merge everything into an opaque bytes blob, send it all at once,
                # and unpickle it on the other side.
                # Performance tests informing the size threshold:
                # https://github.com/dask/distributed/pull/8318
                shards_or_bytes: list | bytes = pickle.dumps(shards)
            else:
                shards_or_bytes = shards
    
            def _send() -> Coroutine[Any, Any, None]:
                return self._send(address, shards_or_bytes)
    
            return await retry(
                _send,
                count=self.RETRY_COUNT,
                delay_min=self.RETRY_DELAY_MIN,
                delay_max=self.RETRY_DELAY_MAX,
            )
    
        async def offload(
            self, func: Callable[_P, _T], *args: _P.args, **kwargs: _P.kwargs
        ) -> _T:
            self.raise_if_closed()
            with context_meter.meter("offload"):
                return await run_in_executor_with_context(
                    self.executor, func, *args, **kwargs
                )
    
        def heartbeat(self) -> dict[str, Any]:
            comm_heartbeat = self._comm_buffer.heartbeat()
            comm_heartbeat["read"] = self.total_recvd
            return {
                "disk": self._disk_buffer.heartbeat(),
                "comm": comm_heartbeat,
                "start": self.start_time,
            }
    
        async def _write_to_comm(
            self, data: dict[str, tuple[_T_partition_id, Any]]
        ) -> None:
            self.raise_if_closed()
            await self._comm_buffer.write(data)
    
        async def _write_to_disk(self, data: dict[NDIndex, Any]) -> None:
            self.raise_if_closed()
            await self._disk_buffer.write(
                {"_".join(str(i) for i in k): v for k, v in data.items()}
            )
    
        def raise_if_closed(self) -> None:
            if self.closed:
                if self._exception:
                    raise self._exception
                raise ShuffleClosedError(f"{self} has already been closed")
    
        async def inputs_done(self) -> None:
            self.raise_if_closed()
            self.transferred = True
            await self._flush_comm()
            try:
                self._comm_buffer.raise_on_exception()
            except Exception as e:
                self._exception = e
                raise
    
        async def _flush_comm(self) -> None:
            self.raise_if_closed()
            await self._comm_buffer.flush()
    
        async def flush_receive(self) -> None:
            self.raise_if_closed()
            await self._disk_buffer.flush()
    
        async def close(self) -> None:
            if self.closed:  # pragma: no cover
                await self._closed_event.wait()
                return
    
            self.closed = True
            await self._comm_buffer.close()
            await self._disk_buffer.close()
            self._closed_event.set()
    
        def fail(self, exception: Exception) -> None:
            if not self.closed:
                self._exception = exception
    
        def _read_from_disk(self, id: NDIndex) -> list[Any]:  # TODO: Typing
            self.raise_if_closed()
            return self._disk_buffer.read("_".join(str(i) for i in id))
    
        async def receive(self, data: list[tuple[_T_partition_id, Any]] | bytes) -> None:
            if isinstance(data, bytes):
                # Unpack opaque blob. See send()
                data = cast(list[tuple[_T_partition_id, Any]], pickle.loads(data))
            await self._receive(data)
    
        async def _ensure_output_worker(self, i: _T_partition_id, key: Key) -> None:
            assigned_worker = self._get_assigned_worker(i)
    
            if assigned_worker != self.local_address:
                result = await self.scheduler.shuffle_restrict_task(
                    id=self.id, run_id=self.run_id, key=key, worker=assigned_worker
                )
                if result["status"] == "error":
                    raise RuntimeError(result["message"])
                assert result["status"] == "OK"
                raise Reschedule()
    
        @abc.abstractmethod
        def _get_assigned_worker(self, i: _T_partition_id) -> str:
            """Get the address of the worker assigned to the output partition"""
    
        @abc.abstractmethod
        async def _receive(self, data: list[tuple[_T_partition_id, Any]]) -> None:
            """Receive shards belonging to output partitions of this shuffle run"""
    
        def add_partition(
            self, data: _T_partition_type, partition_id: _T_partition_id
        ) -> int:
            self.raise_if_closed()
            if self.transferred:
                raise RuntimeError(f"Cannot add more partitions to {self}")
            # Log metrics both in the "execute" and in the "p2p" contexts
            with self._capture_metrics("foreground"):
                with (
                    context_meter.meter("p2p-shard-partition-noncpu"),
                    context_meter.meter("p2p-shard-partition-cpu", func=thread_time),
                ):
                    shards = self._shard_partition(data, partition_id)
                sync(self._loop, self._write_to_comm, shards)
            return self.run_id
    
        @abc.abstractmethod
        def _shard_partition(
            self, data: _T_partition_type, partition_id: _T_partition_id
        ) -> dict[str, tuple[_T_partition_id, Any]]:
            """Shard an input partition by the assigned output workers"""
    
        def get_output_partition(
            self, partition_id: _T_partition_id, key: Key, **kwargs: Any
        ) -> _T_partition_type:
            self.raise_if_closed()
            sync(self._loop, self._ensure_output_worker, partition_id, key)
            if not self.transferred:
                raise RuntimeError("`get_output_partition` called before barrier task")
            sync(self._loop, self.flush_receive)
            with (
                # Log metrics both in the "execute" and in the "p2p" contexts
                self._capture_metrics("foreground"),
                context_meter.meter("p2p-get-output-noncpu"),
                context_meter.meter("p2p-get-output-cpu", func=thread_time),
            ):
                return self._get_output_partition(partition_id, key, **kwargs)
    
        @abc.abstractmethod
        def _get_output_partition(
            self, partition_id: _T_partition_id, key: Key, **kwargs: Any
        ) -> _T_partition_type:
            """Get an output partition to the shuffle run"""
    
        @abc.abstractmethod
        def read(self, path: Path) -> tuple[Any, int]:
            """Read shards from disk"""
    
        @abc.abstractmethod
        def deserialize(self, buffer: Any) -> Any:
            """Deserialize shards"""
    
    
    def get_worker_plugin() -> ShuffleWorkerPlugin:
        from distributed import get_worker
    
        try:
            worker = get_worker()
        except ValueError as e:
            raise RuntimeError(
                "`shuffle='p2p'` requires Dask's distributed scheduler. This task is not running on a Worker; "
                "please confirm that you've created a distributed Client and are submitting this computation through it."
            ) from e
        try:
            return worker.plugins["shuffle"]  # type: ignore
        except KeyError as e:
            raise RuntimeError(
                f"The worker {worker.address} does not have a P2P shuffle plugin."
            ) from e
    
    
    _BARRIER_PREFIX = "shuffle-barrier-"
    
    
    def barrier_key(shuffle_id: ShuffleId) -> str:
        return _BARRIER_PREFIX + shuffle_id
    
    
    def id_from_key(key: Key) -> ShuffleId | None:
        if not isinstance(key, str) or not key.startswith(_BARRIER_PREFIX):
            return None
        return ShuffleId(key[len(_BARRIER_PREFIX) :])
    
    
    class ShuffleType(Enum):
        DATAFRAME = "DataFrameShuffle"
        ARRAY_RECHUNK = "ArrayRechunk"
    
    
    @dataclass(frozen=True)
    class ShuffleRunSpec(Generic[_T_partition_id]):
        run_id: int = field(init=False, default_factory=partial(next, itertools.count(1)))
        spec: ShuffleSpec
        worker_for: dict[_T_partition_id, str]
        span_id: str | None
    
        @property
        def id(self) -> ShuffleId:
            return self.spec.id
    
    
    @dataclass(frozen=True)
    class ShuffleSpec(abc.ABC, Generic[_T_partition_id]):
        id: ShuffleId
        disk: bool
    
        @property
        @abc.abstractmethod
        def output_partitions(self) -> Generator[_T_partition_id, None, None]:
            """Output partitions"""
    
        @abc.abstractmethod
        def pick_worker(self, partition: _T_partition_id, workers: Sequence[str]) -> str:
            """Pick a worker for a partition"""
    
        def create_new_run(
            self,
            worker_for: dict[_T_partition_id, str],
            span_id: str | None,
        ) -> SchedulerShuffleState:
            return SchedulerShuffleState(
                run_spec=ShuffleRunSpec(spec=self, worker_for=worker_for, span_id=span_id),
                participating_workers=set(worker_for.values()),
            )
    
        def validate_data(self, data: Any) -> None:
            """Validate payload data before shuffling"""
    
        @abc.abstractmethod
        def create_run_on_worker(
            self,
            run_id: int,
            span_id: str | None,
            worker_for: dict[_T_partition_id, str],
            plugin: ShuffleWorkerPlugin,
        ) -> ShuffleRun:
            """Create the new shuffle run on the worker."""
    
    
    @dataclass(eq=False)
    class SchedulerShuffleState(Generic[_T_partition_id]):
        run_spec: ShuffleRunSpec
        participating_workers: set[str]
        _archived_by: str | None = field(default=None, init=False)
    
        @property
        def id(self) -> ShuffleId:
            return self.run_spec.id
    
        @property
        def run_id(self) -> int:
            return self.run_spec.run_id
    
        def __str__(self) -> str:
            return f"{self.__class__.__name__}<{self.id}[{self.run_id}]>"
    
        def __hash__(self) -> int:
            return hash(self.run_id)
    
    
    @contextlib.contextmanager
    def handle_transfer_errors(id: ShuffleId) -> Iterator[None]:
        try:
            yield
        except ShuffleClosedError:
            raise Reschedule()
        except Exception as e:
>           raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e
E           RuntimeError: P2P shuffling ffaba2702f1cd74baf9575a827c62460 failed during transfer phase

distributed/shuffle/_core.py:498: RuntimeError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_basic_merge[inner] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
AttributeError: 'DataFrame' object has no attribute '_select_columns_or_index'
self = df, key = '_select_columns_or_index'

    def __getattr__(self, key):
        try:
>           return object.__getattribute__(self, key)
E           AttributeError: 'FromPandasDivisions' object has no attribute '_select_columns_or_index'

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_core.py:439: AttributeError

During handling of the above exception, another exception occurred:

self = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
key = '_select_columns_or_index'

    def __getattr__(self, key):
        try:
            # Prioritize `FrameBase` attributes
            return object.__getattribute__(self, key)
        except AttributeError as err:
            try:
                # Fall back to `expr` API
                # (Making sure to convert to/from Expr)
>               val = getattr(self.expr, key)

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:498: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = df, key = '_select_columns_or_index'

    def __getattr__(self, key):
        try:
            return object.__getattribute__(self, key)
        except AttributeError as err:
            if key.startswith("_meta"):
                # Avoid a recursive loop if/when `self._meta*`
                # produces an `AttributeError`
                raise RuntimeError(
                    f"Failed to generate metadata for {self}. "
                    "This operation may not be supported by the current backend."
                )
    
            # Allow operands to be accessed as attributes
            # as long as the keys are not already reserved
            # by existing methods/properties
            _parameters = type(self)._parameters
            if key in _parameters:
                idx = _parameters.index(key)
                return self.operands[idx]
            if is_dataframe_like(self._meta) and key in self._meta.columns:
                return self[key]
    
            link = "https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage"
>           raise AttributeError(
                f"{err}\n\n"
                "This often means that you are attempting to use an unsupported "
                f"API function. Current API coverage is documented here: {link}."
            )
E           AttributeError: 'FromPandasDivisions' object has no attribute '_select_columns_or_index'
E           
E           This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_core.py:460: AttributeError

During handling of the above exception, another exception occurred:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:37597', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
how = 'inner'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_basic_merge(c, s, a, b, how):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
>       joined = hash_join(a, "y", b, "y", how)

distributed/shuffle/tests/test_merge.py:84: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_merge.py:107: in hash_join_p2p
    lhs = _calculate_partitions(lhs, left_on, npartitions)
distributed/shuffle/_merge.py:47: in _calculate_partitions
    index = _prepare_index_for_partitioning(df, index)
distributed/shuffle/_merge.py:37: in _prepare_index_for_partitioning
    index = df._select_columns_or_index(index)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:2720: in __getattr__
    return super().__getattr__(key)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:504: in __getattr__
    raise err
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
key = '_select_columns_or_index'

    def __getattr__(self, key):
        try:
            # Prioritize `FrameBase` attributes
>           return object.__getattribute__(self, key)
E           AttributeError: 'DataFrame' object has no attribute '_select_columns_or_index'

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:493: AttributeError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_basic_merge[left] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
AttributeError: 'DataFrame' object has no attribute '_select_columns_or_index'
self = df, key = '_select_columns_or_index'

    def __getattr__(self, key):
        try:
>           return object.__getattribute__(self, key)
E           AttributeError: 'FromPandasDivisions' object has no attribute '_select_columns_or_index'

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_core.py:439: AttributeError

During handling of the above exception, another exception occurred:

self = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
key = '_select_columns_or_index'

    def __getattr__(self, key):
        try:
            # Prioritize `FrameBase` attributes
            return object.__getattribute__(self, key)
        except AttributeError as err:
            try:
                # Fall back to `expr` API
                # (Making sure to convert to/from Expr)
>               val = getattr(self.expr, key)

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:498: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = df, key = '_select_columns_or_index'

    def __getattr__(self, key):
        try:
            return object.__getattribute__(self, key)
        except AttributeError as err:
            if key.startswith("_meta"):
                # Avoid a recursive loop if/when `self._meta*`
                # produces an `AttributeError`
                raise RuntimeError(
                    f"Failed to generate metadata for {self}. "
                    "This operation may not be supported by the current backend."
                )
    
            # Allow operands to be accessed as attributes
            # as long as the keys are not already reserved
            # by existing methods/properties
            _parameters = type(self)._parameters
            if key in _parameters:
                idx = _parameters.index(key)
                return self.operands[idx]
            if is_dataframe_like(self._meta) and key in self._meta.columns:
                return self[key]
    
            link = "https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage"
>           raise AttributeError(
                f"{err}\n\n"
                "This often means that you are attempting to use an unsupported "
                f"API function. Current API coverage is documented here: {link}."
            )
E           AttributeError: 'FromPandasDivisions' object has no attribute '_select_columns_or_index'
E           
E           This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_core.py:460: AttributeError

During handling of the above exception, another exception occurred:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:42057', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
how = 'left'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_basic_merge(c, s, a, b, how):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
>       joined = hash_join(a, "y", b, "y", how)

distributed/shuffle/tests/test_merge.py:84: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_merge.py:107: in hash_join_p2p
    lhs = _calculate_partitions(lhs, left_on, npartitions)
distributed/shuffle/_merge.py:47: in _calculate_partitions
    index = _prepare_index_for_partitioning(df, index)
distributed/shuffle/_merge.py:37: in _prepare_index_for_partitioning
    index = df._select_columns_or_index(index)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:2720: in __getattr__
    return super().__getattr__(key)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:504: in __getattr__
    raise err
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
key = '_select_columns_or_index'

    def __getattr__(self, key):
        try:
            # Prioritize `FrameBase` attributes
>           return object.__getattribute__(self, key)
E           AttributeError: 'DataFrame' object has no attribute '_select_columns_or_index'

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:493: AttributeError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_basic_merge[right] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
AttributeError: 'DataFrame' object has no attribute '_select_columns_or_index'
self = df, key = '_select_columns_or_index'

    def __getattr__(self, key):
        try:
>           return object.__getattribute__(self, key)
E           AttributeError: 'FromPandasDivisions' object has no attribute '_select_columns_or_index'

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_core.py:439: AttributeError

During handling of the above exception, another exception occurred:

self = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
key = '_select_columns_or_index'

    def __getattr__(self, key):
        try:
            # Prioritize `FrameBase` attributes
            return object.__getattribute__(self, key)
        except AttributeError as err:
            try:
                # Fall back to `expr` API
                # (Making sure to convert to/from Expr)
>               val = getattr(self.expr, key)

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:498: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = df, key = '_select_columns_or_index'

    def __getattr__(self, key):
        try:
            return object.__getattribute__(self, key)
        except AttributeError as err:
            if key.startswith("_meta"):
                # Avoid a recursive loop if/when `self._meta*`
                # produces an `AttributeError`
                raise RuntimeError(
                    f"Failed to generate metadata for {self}. "
                    "This operation may not be supported by the current backend."
                )
    
            # Allow operands to be accessed as attributes
            # as long as the keys are not already reserved
            # by existing methods/properties
            _parameters = type(self)._parameters
            if key in _parameters:
                idx = _parameters.index(key)
                return self.operands[idx]
            if is_dataframe_like(self._meta) and key in self._meta.columns:
                return self[key]
    
            link = "https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage"
>           raise AttributeError(
                f"{err}\n\n"
                "This often means that you are attempting to use an unsupported "
                f"API function. Current API coverage is documented here: {link}."
            )
E           AttributeError: 'FromPandasDivisions' object has no attribute '_select_columns_or_index'
E           
E           This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_core.py:460: AttributeError

During handling of the above exception, another exception occurred:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:43891', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
how = 'right'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_basic_merge(c, s, a, b, how):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
>       joined = hash_join(a, "y", b, "y", how)

distributed/shuffle/tests/test_merge.py:84: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_merge.py:107: in hash_join_p2p
    lhs = _calculate_partitions(lhs, left_on, npartitions)
distributed/shuffle/_merge.py:47: in _calculate_partitions
    index = _prepare_index_for_partitioning(df, index)
distributed/shuffle/_merge.py:37: in _prepare_index_for_partitioning
    index = df._select_columns_or_index(index)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:2720: in __getattr__
    return super().__getattr__(key)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:504: in __getattr__
    raise err
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
key = '_select_columns_or_index'

    def __getattr__(self, key):
        try:
            # Prioritize `FrameBase` attributes
>           return object.__getattribute__(self, key)
E           AttributeError: 'DataFrame' object has no attribute '_select_columns_or_index'

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:493: AttributeError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_basic_merge[outer] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
AttributeError: 'DataFrame' object has no attribute '_select_columns_or_index'
self = df, key = '_select_columns_or_index'

    def __getattr__(self, key):
        try:
>           return object.__getattribute__(self, key)
E           AttributeError: 'FromPandasDivisions' object has no attribute '_select_columns_or_index'

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_core.py:439: AttributeError

During handling of the above exception, another exception occurred:

self = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
key = '_select_columns_or_index'

    def __getattr__(self, key):
        try:
            # Prioritize `FrameBase` attributes
            return object.__getattribute__(self, key)
        except AttributeError as err:
            try:
                # Fall back to `expr` API
                # (Making sure to convert to/from Expr)
>               val = getattr(self.expr, key)

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:498: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = df, key = '_select_columns_or_index'

    def __getattr__(self, key):
        try:
            return object.__getattribute__(self, key)
        except AttributeError as err:
            if key.startswith("_meta"):
                # Avoid a recursive loop if/when `self._meta*`
                # produces an `AttributeError`
                raise RuntimeError(
                    f"Failed to generate metadata for {self}. "
                    "This operation may not be supported by the current backend."
                )
    
            # Allow operands to be accessed as attributes
            # as long as the keys are not already reserved
            # by existing methods/properties
            _parameters = type(self)._parameters
            if key in _parameters:
                idx = _parameters.index(key)
                return self.operands[idx]
            if is_dataframe_like(self._meta) and key in self._meta.columns:
                return self[key]
    
            link = "https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage"
>           raise AttributeError(
                f"{err}\n\n"
                "This often means that you are attempting to use an unsupported "
                f"API function. Current API coverage is documented here: {link}."
            )
E           AttributeError: 'FromPandasDivisions' object has no attribute '_select_columns_or_index'
E           
E           This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_core.py:460: AttributeError

During handling of the above exception, another exception occurred:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:33765', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
how = 'outer'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_basic_merge(c, s, a, b, how):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
>       joined = hash_join(a, "y", b, "y", how)

distributed/shuffle/tests/test_merge.py:84: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/_merge.py:107: in hash_join_p2p
    lhs = _calculate_partitions(lhs, left_on, npartitions)
distributed/shuffle/_merge.py:47: in _calculate_partitions
    index = _prepare_index_for_partitioning(df, index)
distributed/shuffle/_merge.py:37: in _prepare_index_for_partitioning
    index = df._select_columns_or_index(index)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:2720: in __getattr__
    return super().__getattr__(key)
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:504: in __getattr__
    raise err
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
key = '_select_columns_or_index'

    def __getattr__(self, key):
        try:
            # Prioritize `FrameBase` attributes
>           return object.__getattribute__(self, key)
E           AttributeError: 'DataFrame' object has no attribute '_select_columns_or_index'

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:493: AttributeError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_merge[True-inner] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 7s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 4s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 4s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 4s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 5s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 4s]
Raw output
AttributeError: 'Future' object has no attribute 'columns'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:39933', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
how = 'inner', disk = True

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
            res = await c.compute(joined)
            assert_eq(
                res,
                pd.merge(A, B, left_index=True, right_index=True, how=how),
            )
            joined = dd.merge(a, b, on="y", how=how)
            result = await c.compute(joined)
            list_eq(result, pd.merge(A, B, on="y", how=how))
            assert all(d is None for d in joined.divisions)
    
            list_eq(
                await c.compute(dd.merge(a, b, left_on="x", right_on="z", how=how)),
                pd.merge(A, B, left_on="x", right_on="z", how=how),
            )
            list_eq(
                await c.compute(
                    dd.merge(
                        a,
                        b,
                        left_on="x",
                        right_on="z",
                        how=how,
                        suffixes=("1", "2"),
                    )
                ),
                pd.merge(A, B, left_on="x", right_on="z", how=how, suffixes=("1", "2")),
            )
    
            list_eq(
                await c.compute(dd.merge(a, b, how=how)),
                pd.merge(A, B, how=how),
            )
            list_eq(
                await c.compute(dd.merge(a, B, how=how)),
                pd.merge(A, B, how=how),
            )
            list_eq(
                await c.compute(dd.merge(A, b, how=how)),
                pd.merge(A, B, how=how),
            )
            # Note: No await since A and B are both pandas dataframes and this doesn't
            # actually submit anything
>           list_eq(
                c.compute(dd.merge(A, B, how=how)),
                pd.merge(A, B, how=how),
            )

distributed/shuffle/tests/test_merge.py:227: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

aa = <Future: cancelled, key: finalize-c2d01fa93f1da710051065f774ef2820>
bb =    x  y  z
0  1  1  6
1  2  1  6
2  5  3  5
3  6  4  4
4  6  4  3

    def list_eq(aa, bb):
        if isinstance(aa, dd.DataFrame):
            a = aa.compute(scheduler="sync")
        else:
            a = aa
        if isinstance(bb, dd.DataFrame):
            b = bb.compute(scheduler="sync")
        else:
            b = bb
>       tm.assert_index_equal(a.columns, b.columns)
E       AttributeError: 'Future' object has no attribute 'columns'

distributed/shuffle/tests/test_merge.py:43: AttributeError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_merge[True-outer] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 8s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 4s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 4s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 5s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 5s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 5s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 5s]
Raw output
AttributeError: 'Future' object has no attribute 'columns'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:34581', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
how = 'outer', disk = True

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
            res = await c.compute(joined)
            assert_eq(
                res,
                pd.merge(A, B, left_index=True, right_index=True, how=how),
            )
            joined = dd.merge(a, b, on="y", how=how)
            result = await c.compute(joined)
            list_eq(result, pd.merge(A, B, on="y", how=how))
            assert all(d is None for d in joined.divisions)
    
            list_eq(
                await c.compute(dd.merge(a, b, left_on="x", right_on="z", how=how)),
                pd.merge(A, B, left_on="x", right_on="z", how=how),
            )
            list_eq(
                await c.compute(
                    dd.merge(
                        a,
                        b,
                        left_on="x",
                        right_on="z",
                        how=how,
                        suffixes=("1", "2"),
                    )
                ),
                pd.merge(A, B, left_on="x", right_on="z", how=how, suffixes=("1", "2")),
            )
    
            list_eq(
                await c.compute(dd.merge(a, b, how=how)),
                pd.merge(A, B, how=how),
            )
            list_eq(
                await c.compute(dd.merge(a, B, how=how)),
                pd.merge(A, B, how=how),
            )
            list_eq(
                await c.compute(dd.merge(A, b, how=how)),
                pd.merge(A, B, how=how),
            )
            # Note: No await since A and B are both pandas dataframes and this doesn't
            # actually submit anything
>           list_eq(
                c.compute(dd.merge(A, B, how=how)),
                pd.merge(A, B, how=how),
            )

distributed/shuffle/tests/test_merge.py:227: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

aa = <Future: cancelled, key: finalize-ef0be5f37449cd8bc6c70a26dcce9082>
bb =      x  y    z
0  1.0  1  6.0
1  2.0  1  6.0
2  3.0  2  NaN
3  4.0  2  NaN
4  5.0  3  5.0
5  6.0  4  4.0
6  6.0  4  3.0
7  NaN  5  2.0
8  NaN  6  1.0

    def list_eq(aa, bb):
        if isinstance(aa, dd.DataFrame):
            a = aa.compute(scheduler="sync")
        else:
            a = aa
        if isinstance(bb, dd.DataFrame):
            b = bb.compute(scheduler="sync")
        else:
            b = bb
>       tm.assert_index_equal(a.columns, b.columns)
E       AttributeError: 'Future' object has no attribute 'columns'

distributed/shuffle/tests/test_merge.py:43: AttributeError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_merge[True-left] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 8s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 4s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 5s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 5s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 5s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 4s]
Raw output
AttributeError: 'Future' object has no attribute 'columns'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:33525', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
how = 'left', disk = True

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
            res = await c.compute(joined)
            assert_eq(
                res,
                pd.merge(A, B, left_index=True, right_index=True, how=how),
            )
            joined = dd.merge(a, b, on="y", how=how)
            result = await c.compute(joined)
            list_eq(result, pd.merge(A, B, on="y", how=how))
            assert all(d is None for d in joined.divisions)
    
            list_eq(
                await c.compute(dd.merge(a, b, left_on="x", right_on="z", how=how)),
                pd.merge(A, B, left_on="x", right_on="z", how=how),
            )
            list_eq(
                await c.compute(
                    dd.merge(
                        a,
                        b,
                        left_on="x",
                        right_on="z",
                        how=how,
                        suffixes=("1", "2"),
                    )
                ),
                pd.merge(A, B, left_on="x", right_on="z", how=how, suffixes=("1", "2")),
            )
    
            list_eq(
                await c.compute(dd.merge(a, b, how=how)),
                pd.merge(A, B, how=how),
            )
            list_eq(
                await c.compute(dd.merge(a, B, how=how)),
                pd.merge(A, B, how=how),
            )
            list_eq(
                await c.compute(dd.merge(A, b, how=how)),
                pd.merge(A, B, how=how),
            )
            # Note: No await since A and B are both pandas dataframes and this doesn't
            # actually submit anything
>           list_eq(
                c.compute(dd.merge(A, B, how=how)),
                pd.merge(A, B, how=how),
            )

distributed/shuffle/tests/test_merge.py:227: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

aa = <Future: cancelled, key: finalize-705645df12ad9279e21f02234b5790d8>
bb =    x  y    z
0  1  1  6.0
1  2  1  6.0
2  3  2  NaN
3  4  2  NaN
4  5  3  5.0
5  6  4  4.0
6  6  4  3.0

    def list_eq(aa, bb):
        if isinstance(aa, dd.DataFrame):
            a = aa.compute(scheduler="sync")
        else:
            a = aa
        if isinstance(bb, dd.DataFrame):
            b = bb.compute(scheduler="sync")
        else:
            b = bb
>       tm.assert_index_equal(a.columns, b.columns)
E       AttributeError: 'Future' object has no attribute 'columns'

distributed/shuffle/tests/test_merge.py:43: AttributeError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_merge[True-right] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 6s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 4s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 5s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 5s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 5s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 4s]
Raw output
AttributeError: 'Future' object has no attribute 'columns'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:39603', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
how = 'right', disk = True

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
            res = await c.compute(joined)
            assert_eq(
                res,
                pd.merge(A, B, left_index=True, right_index=True, how=how),
            )
            joined = dd.merge(a, b, on="y", how=how)
            result = await c.compute(joined)
            list_eq(result, pd.merge(A, B, on="y", how=how))
            assert all(d is None for d in joined.divisions)
    
            list_eq(
                await c.compute(dd.merge(a, b, left_on="x", right_on="z", how=how)),
                pd.merge(A, B, left_on="x", right_on="z", how=how),
            )
            list_eq(
                await c.compute(
                    dd.merge(
                        a,
                        b,
                        left_on="x",
                        right_on="z",
                        how=how,
                        suffixes=("1", "2"),
                    )
                ),
                pd.merge(A, B, left_on="x", right_on="z", how=how, suffixes=("1", "2")),
            )
    
            list_eq(
                await c.compute(dd.merge(a, b, how=how)),
                pd.merge(A, B, how=how),
            )
            list_eq(
                await c.compute(dd.merge(a, B, how=how)),
                pd.merge(A, B, how=how),
            )
            list_eq(
                await c.compute(dd.merge(A, b, how=how)),
                pd.merge(A, B, how=how),
            )
            # Note: No await since A and B are both pandas dataframes and this doesn't
            # actually submit anything
>           list_eq(
                c.compute(dd.merge(A, B, how=how)),
                pd.merge(A, B, how=how),
            )

distributed/shuffle/tests/test_merge.py:227: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

aa = <Future: cancelled, key: finalize-720259ef7b4a0b226445c69fcc19f06a>
bb =      x  y  z
0  1.0  1  6
1  2.0  1  6
2  5.0  3  5
3  6.0  4  4
4  6.0  4  3
5  NaN  5  2
6  NaN  6  1

    def list_eq(aa, bb):
        if isinstance(aa, dd.DataFrame):
            a = aa.compute(scheduler="sync")
        else:
            a = aa
        if isinstance(bb, dd.DataFrame):
            b = bb.compute(scheduler="sync")
        else:
            b = bb
>       tm.assert_index_equal(a.columns, b.columns)
E       AttributeError: 'Future' object has no attribute 'columns'

distributed/shuffle/tests/test_merge.py:43: AttributeError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_merge[False-inner] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 9s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 3s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 4s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 5s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 4s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 4s]
Raw output
AttributeError: 'Future' object has no attribute 'columns'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:35641', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
how = 'inner', disk = False

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
            res = await c.compute(joined)
            assert_eq(
                res,
                pd.merge(A, B, left_index=True, right_index=True, how=how),
            )
            joined = dd.merge(a, b, on="y", how=how)
            result = await c.compute(joined)
            list_eq(result, pd.merge(A, B, on="y", how=how))
            assert all(d is None for d in joined.divisions)
    
            list_eq(
                await c.compute(dd.merge(a, b, left_on="x", right_on="z", how=how)),
                pd.merge(A, B, left_on="x", right_on="z", how=how),
            )
            list_eq(
                await c.compute(
                    dd.merge(
                        a,
                        b,
                        left_on="x",
                        right_on="z",
                        how=how,
                        suffixes=("1", "2"),
                    )
                ),
                pd.merge(A, B, left_on="x", right_on="z", how=how, suffixes=("1", "2")),
            )
    
            list_eq(
                await c.compute(dd.merge(a, b, how=how)),
                pd.merge(A, B, how=how),
            )
            list_eq(
                await c.compute(dd.merge(a, B, how=how)),
                pd.merge(A, B, how=how),
            )
            list_eq(
                await c.compute(dd.merge(A, b, how=how)),
                pd.merge(A, B, how=how),
            )
            # Note: No await since A and B are both pandas dataframes and this doesn't
            # actually submit anything
>           list_eq(
                c.compute(dd.merge(A, B, how=how)),
                pd.merge(A, B, how=how),
            )

distributed/shuffle/tests/test_merge.py:227: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

aa = <Future: cancelled, key: finalize-c2d01fa93f1da710051065f774ef2820>
bb =    x  y  z
0  1  1  6
1  2  1  6
2  5  3  5
3  6  4  4
4  6  4  3

    def list_eq(aa, bb):
        if isinstance(aa, dd.DataFrame):
            a = aa.compute(scheduler="sync")
        else:
            a = aa
        if isinstance(bb, dd.DataFrame):
            b = bb.compute(scheduler="sync")
        else:
            b = bb
>       tm.assert_index_equal(a.columns, b.columns)
E       AttributeError: 'Future' object has no attribute 'columns'

distributed/shuffle/tests/test_merge.py:43: AttributeError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_merge[False-outer] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 10s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 4s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 4s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 4s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 5s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 5s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 5s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 5s]
Raw output
AttributeError: 'Future' object has no attribute 'columns'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:37195', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
how = 'outer', disk = False

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
            res = await c.compute(joined)
            assert_eq(
                res,
                pd.merge(A, B, left_index=True, right_index=True, how=how),
            )
            joined = dd.merge(a, b, on="y", how=how)
            result = await c.compute(joined)
            list_eq(result, pd.merge(A, B, on="y", how=how))
            assert all(d is None for d in joined.divisions)
    
            list_eq(
                await c.compute(dd.merge(a, b, left_on="x", right_on="z", how=how)),
                pd.merge(A, B, left_on="x", right_on="z", how=how),
            )
            list_eq(
                await c.compute(
                    dd.merge(
                        a,
                        b,
                        left_on="x",
                        right_on="z",
                        how=how,
                        suffixes=("1", "2"),
                    )
                ),
                pd.merge(A, B, left_on="x", right_on="z", how=how, suffixes=("1", "2")),
            )
    
            list_eq(
                await c.compute(dd.merge(a, b, how=how)),
                pd.merge(A, B, how=how),
            )
            list_eq(
                await c.compute(dd.merge(a, B, how=how)),
                pd.merge(A, B, how=how),
            )
            list_eq(
                await c.compute(dd.merge(A, b, how=how)),
                pd.merge(A, B, how=how),
            )
            # Note: No await since A and B are both pandas dataframes and this doesn't
            # actually submit anything
>           list_eq(
                c.compute(dd.merge(A, B, how=how)),
                pd.merge(A, B, how=how),
            )

distributed/shuffle/tests/test_merge.py:227: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

aa = <Future: cancelled, key: finalize-ef0be5f37449cd8bc6c70a26dcce9082>
bb =      x  y    z
0  1.0  1  6.0
1  2.0  1  6.0
2  3.0  2  NaN
3  4.0  2  NaN
4  5.0  3  5.0
5  6.0  4  4.0
6  6.0  4  3.0
7  NaN  5  2.0
8  NaN  6  1.0

    def list_eq(aa, bb):
        if isinstance(aa, dd.DataFrame):
            a = aa.compute(scheduler="sync")
        else:
            a = aa
        if isinstance(bb, dd.DataFrame):
            b = bb.compute(scheduler="sync")
        else:
            b = bb
>       tm.assert_index_equal(a.columns, b.columns)
E       AttributeError: 'Future' object has no attribute 'columns'

distributed/shuffle/tests/test_merge.py:43: AttributeError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_merge[False-left] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 7s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 4s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 4s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 5s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 5s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 5s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 4s]
Raw output
AttributeError: 'Future' object has no attribute 'columns'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:38203', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
how = 'left', disk = False

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
            res = await c.compute(joined)
            assert_eq(
                res,
                pd.merge(A, B, left_index=True, right_index=True, how=how),
            )
            joined = dd.merge(a, b, on="y", how=how)
            result = await c.compute(joined)
            list_eq(result, pd.merge(A, B, on="y", how=how))
            assert all(d is None for d in joined.divisions)
    
            list_eq(
                await c.compute(dd.merge(a, b, left_on="x", right_on="z", how=how)),
                pd.merge(A, B, left_on="x", right_on="z", how=how),
            )
            list_eq(
                await c.compute(
                    dd.merge(
                        a,
                        b,
                        left_on="x",
                        right_on="z",
                        how=how,
                        suffixes=("1", "2"),
                    )
                ),
                pd.merge(A, B, left_on="x", right_on="z", how=how, suffixes=("1", "2")),
            )
    
            list_eq(
                await c.compute(dd.merge(a, b, how=how)),
                pd.merge(A, B, how=how),
            )
            list_eq(
                await c.compute(dd.merge(a, B, how=how)),
                pd.merge(A, B, how=how),
            )
            list_eq(
                await c.compute(dd.merge(A, b, how=how)),
                pd.merge(A, B, how=how),
            )
            # Note: No await since A and B are both pandas dataframes and this doesn't
            # actually submit anything
>           list_eq(
                c.compute(dd.merge(A, B, how=how)),
                pd.merge(A, B, how=how),
            )

distributed/shuffle/tests/test_merge.py:227: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

aa = <Future: cancelled, key: finalize-705645df12ad9279e21f02234b5790d8>
bb =    x  y    z
0  1  1  6.0
1  2  1  6.0
2  3  2  NaN
3  4  2  NaN
4  5  3  5.0
5  6  4  4.0
6  6  4  3.0

    def list_eq(aa, bb):
        if isinstance(aa, dd.DataFrame):
            a = aa.compute(scheduler="sync")
        else:
            a = aa
        if isinstance(bb, dd.DataFrame):
            b = bb.compute(scheduler="sync")
        else:
            b = bb
>       tm.assert_index_equal(a.columns, b.columns)
E       AttributeError: 'Future' object has no attribute 'columns'

distributed/shuffle/tests/test_merge.py:43: AttributeError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 11 runs failed: test_merge[False-right] (distributed.shuffle.tests.test_merge)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 6s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 3s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 4s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 4s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 5s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 5s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 5s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 4s]
Raw output
AttributeError: 'Future' object has no attribute 'columns'
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:33887', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: from_pd_divs, 1 graph layer
Expr=df
how = 'right', disk = False

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
            res = await c.compute(joined)
            assert_eq(
                res,
                pd.merge(A, B, left_index=True, right_index=True, how=how),
            )
            joined = dd.merge(a, b, on="y", how=how)
            result = await c.compute(joined)
            list_eq(result, pd.merge(A, B, on="y", how=how))
            assert all(d is None for d in joined.divisions)
    
            list_eq(
                await c.compute(dd.merge(a, b, left_on="x", right_on="z", how=how)),
                pd.merge(A, B, left_on="x", right_on="z", how=how),
            )
            list_eq(
                await c.compute(
                    dd.merge(
                        a,
                        b,
                        left_on="x",
                        right_on="z",
                        how=how,
                        suffixes=("1", "2"),
                    )
                ),
                pd.merge(A, B, left_on="x", right_on="z", how=how, suffixes=("1", "2")),
            )
    
            list_eq(
                await c.compute(dd.merge(a, b, how=how)),
                pd.merge(A, B, how=how),
            )
            list_eq(
                await c.compute(dd.merge(a, B, how=how)),
                pd.merge(A, B, how=how),
            )
            list_eq(
                await c.compute(dd.merge(A, b, how=how)),
                pd.merge(A, B, how=how),
            )
            # Note: No await since A and B are both pandas dataframes and this doesn't
            # actually submit anything
>           list_eq(
                c.compute(dd.merge(A, B, how=how)),
                pd.merge(A, B, how=how),
            )

distributed/shuffle/tests/test_merge.py:227: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

aa = <Future: cancelled, key: finalize-720259ef7b4a0b226445c69fcc19f06a>
bb =      x  y  z
0  1.0  1  6
1  2.0  1  6
2  5.0  3  5
3  6.0  4  4
4  6.0  4  3
5  NaN  5  2
6  NaN  6  1

    def list_eq(aa, bb):
        if isinstance(aa, dd.DataFrame):
            a = aa.compute(scheduler="sync")
        else:
            a = aa
        if isinstance(bb, dd.DataFrame):
            b = bb.compute(scheduler="sync")
        else:
            b = bb
>       tm.assert_index_equal(a.columns, b.columns)
E       AttributeError: 'Future' object has no attribute 'columns'

distributed/shuffle/tests/test_merge.py:43: AttributeError

Check warning on line 0 in distributed.tests.test_client

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 13 runs failed: test_recreate_error_collection (distributed.tests.test_client)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
UserWarning: 
You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=('a', 'int64'))
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:39591', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:43413', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = dask.bag<lambda, npartitions=4>

    @gen_cluster(client=True)
    async def test_recreate_error_collection(c, s, a, b):
        b = db.range(10, npartitions=4)
        b = b.map(lambda x: 1 / x)
        b = b.persist()
        f = c.compute(b)
    
        error_f = await c._get_errored_future(f)
        function, args, kwargs = await c._get_components_from_future(error_f)
        with pytest.raises(ZeroDivisionError):
            function(*args, **kwargs)
    
        dd = pytest.importorskip("dask.dataframe")
        import pandas as pd
    
        df = dd.from_pandas(pd.DataFrame({"a": [0, 1, 2, 3, 4]}), chunksize=2)
    
        def make_err(x):
            # because pandas would happily work with NaN
            if x == 0:
                raise ValueError
            return x
    
>       df2 = df.a.map(make_err)

distributed/tests/test_client.py:4830: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Dask Series Structure:
npartitions=3
0    int64
2      ...
4      ...
4      ...
Dask Name: getitem, 2 graph layers
Expr=df['a']
arg = <function test_recreate_error_collection.<locals>.make_err at 0x7f6c39b6e7a0>
na_action = None, meta = 0    1
1    1
Name: a, dtype: int64

    @derived_from(pd.Series)
    def map(self, arg, na_action=None, meta=None):
        if isinstance(arg, Series):
            if not expr.are_co_aligned(self.expr, arg.expr):
                if meta is None:
                    warnings.warn(meta_warning(meta))
                return new_collection(
                    expr.MapAlign(self, arg, op=None, na_action=na_action, meta=meta)
                )
        if meta is None:
            meta = expr._emulate(M.map, self, arg, na_action=na_action, udf=True)
>           warnings.warn(meta_warning(meta))
E           UserWarning: 
E           You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
E           To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
E             Before: .apply(func)
E             After:  .apply(func, meta=('a', 'int64'))

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:3816: UserWarning

Check warning on line 0 in distributed.tests.test_client

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 13 runs failed: test_recreate_task_collection (distributed.tests.test_client)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
UserWarning: 
You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=('a', 'int64'))
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:37465', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:39123', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = dask.bag<lambda, npartitions=4>

    @gen_cluster(client=True)
    async def test_recreate_task_collection(c, s, a, b):
        b = db.range(10, npartitions=4)
        b = b.map(lambda x: int(3628800 / (x + 1)))
        b = b.persist()
        f = c.compute(b)
    
        function, args, kwargs = await c._get_components_from_future(f)
        assert function(*args, **kwargs) == [
            3628800,
            1814400,
            1209600,
            907200,
            725760,
            604800,
            518400,
            453600,
            403200,
            362880,
        ]
    
        dd = pytest.importorskip("dask.dataframe")
        import pandas as pd
    
        df = dd.from_pandas(pd.DataFrame({"a": [0, 1, 2, 3, 4]}), chunksize=2)
    
>       df2 = df.a.map(lambda x: x + 1)

distributed/tests/test_client.py:4938: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Dask Series Structure:
npartitions=3
0    int64
2      ...
4      ...
4      ...
Dask Name: getitem, 2 graph layers
Expr=df['a']
arg = <function test_recreate_task_collection.<locals>.<lambda> at 0x7f6c0448bb50>
na_action = None, meta = 0    2
1    2
Name: a, dtype: int64

    @derived_from(pd.Series)
    def map(self, arg, na_action=None, meta=None):
        if isinstance(arg, Series):
            if not expr.are_co_aligned(self.expr, arg.expr):
                if meta is None:
                    warnings.warn(meta_warning(meta))
                return new_collection(
                    expr.MapAlign(self, arg, op=None, na_action=na_action, meta=meta)
                )
        if meta is None:
            meta = expr._emulate(M.map, self, arg, na_action=na_action, udf=True)
>           warnings.warn(meta_warning(meta))
E           UserWarning: 
E           You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
E           To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
E             Before: .apply(func)
E             After:  .apply(func, meta=('a', 'int64'))

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:3816: UserWarning

Check warning on line 0 in distributed.tests.test_client

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

5 out of 13 runs failed: test_file_descriptors_dont_leak[Worker] (distributed.tests.test_client)

artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
UserWarning: dask_expr does not support the DataFrameIOFunction protocol for column projection. To enable column projection, please ensure that the signature of `func` includes a `columns=` keyword argument instead.
Worker = <class 'distributed.worker.Worker'>

    @pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows")
    @pytest.mark.skipif(MACOS, reason="dask/distributed#8075")
    @pytest.mark.parametrize(
        "Worker", [Worker, pytest.param(Nanny, marks=[pytest.mark.slow])]
    )
    @gen_test()
    async def test_file_descriptors_dont_leak(Worker):
        pytest.importorskip("pandas")
>       df = dask.datasets.timeseries(freq="10s", dtypes={"x": int, "y": float})

distributed/tests/test_client.py:6453: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/datasets.py:63: in timeseries
    return make_timeseries(
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/dataframe/io/demo.py:434: in make_timeseries
    return from_map(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

func = <dask.dataframe.io.demo.MakeDataframePart object at 0x7f6bfcad4fa0>
args = None
meta =                x        y
timestamp                
2000-01-01  1005  0.37228
divisions = [Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-02 00:00:00'), Timestamp('2000-01-03 00:00:00'), Timestamp('2000-01-04 00:00:00'), Timestamp('2000-01-05 00:00:00'), Timestamp('2000-01-06 00:00:00'), ...]
label = 'make-timeseries', enforce_metadata = False
iterables = [[([Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-02 00:00:00')], 1996445312), ([Timestamp('2000-01-02 00:00:00...-06 00:00:00')], 1654270537), ([Timestamp('2000-01-06 00:00:00'), Timestamp('2000-01-07 00:00:00')], 1627900200), ...]]
kwargs = {}
DataFrameIOFunction = <class 'dask.dataframe.io.utils.DataFrameIOFunction'>
FromMap = <class 'dask_expr.io.io.FromMap'>
FromMapProjectable = <class 'dask_expr.io.io.FromMapProjectable'>
lengths = {30}, i = 0

    def from_map(
        func,
        *iterables,
        args=None,
        meta=no_default,
        divisions=None,
        label=None,
        enforce_metadata=False,
        **kwargs,
    ):
        """Create a dask-expr collection from a custom function map
    
        NOTE: The underlying ``Expr`` object produced by this API
        will support column projection (via ``simplify``) if
        the ``func`` argument has "columns" in its signature.
    
        """
        from dask.dataframe.io.utils import DataFrameIOFunction
    
        from dask_expr.io import FromMap, FromMapProjectable
    
        if "token" in kwargs:
            # This option doesn't really make sense in dask-expr
            raise NotImplementedError("dask_expr does not support a token argument.")
    
        lengths = set()
        iterables = list(iterables)
        for i, iterable in enumerate(iterables):
            if not isinstance(iterable, Iterable):
                raise ValueError(
                    f"All elements of `iterables` must be Iterable, got {type(iterable)}"
                )
            try:
                lengths.add(len(iterable))
            except (AttributeError, TypeError):
                iterables[i] = list(iterable)
                lengths.add(len(iterables[i]))
        if len(lengths) == 0:
            raise ValueError("`from_map` requires at least one Iterable input")
        elif len(lengths) > 1:
            raise ValueError("All `iterables` must have the same length")
        if lengths == {0}:
            raise ValueError("All `iterables` must have a non-zero length")
    
        # Check if `func` supports column projection
        allow_projection = False
        columns_arg_required = False
        if param := inspect.signature(func).parameters.get("columns", None):
            allow_projection = True
            columns_arg_required = param.default is param.empty
            if meta is no_default and columns_arg_required:
                raise TypeError(
                    "Argument `func` of `from_map` has a required `columns` "
                    " parameter and not `meta` provided."
                    "Either provide `meta` yourself or make `columns` an optional argument."
                )
        elif isinstance(func, DataFrameIOFunction):
>           warnings.warn(
                "dask_expr does not support the DataFrameIOFunction "
                "protocol for column projection. To enable column "
                "projection, please ensure that the signature of `func` "
                "includes a `columns=` keyword argument instead."
            )
E           UserWarning: dask_expr does not support the DataFrameIOFunction protocol for column projection. To enable column projection, please ensure that the signature of `func` includes a `columns=` keyword argument instead.

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:5015: UserWarning

Check warning on line 0 in distributed.tests.test_client

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

5 out of 13 runs failed: test_file_descriptors_dont_leak[Nanny] (distributed.tests.test_client)

artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
UserWarning: dask_expr does not support the DataFrameIOFunction protocol for column projection. To enable column projection, please ensure that the signature of `func` includes a `columns=` keyword argument instead.
Worker = <class 'distributed.nanny.Nanny'>

    @pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows")
    @pytest.mark.skipif(MACOS, reason="dask/distributed#8075")
    @pytest.mark.parametrize(
        "Worker", [Worker, pytest.param(Nanny, marks=[pytest.mark.slow])]
    )
    @gen_test()
    async def test_file_descriptors_dont_leak(Worker):
        pytest.importorskip("pandas")
>       df = dask.datasets.timeseries(freq="10s", dtypes={"x": int, "y": float})

distributed/tests/test_client.py:6453: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/datasets.py:63: in timeseries
    return make_timeseries(
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/dataframe/io/demo.py:434: in make_timeseries
    return from_map(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

func = <dask.dataframe.io.demo.MakeDataframePart object at 0x7f6bfeb5b8b0>
args = None
meta =               x         y
timestamp                
2000-01-01  985  0.632398
divisions = [Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-02 00:00:00'), Timestamp('2000-01-03 00:00:00'), Timestamp('2000-01-04 00:00:00'), Timestamp('2000-01-05 00:00:00'), Timestamp('2000-01-06 00:00:00'), ...]
label = 'make-timeseries', enforce_metadata = False
iterables = [[([Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-02 00:00:00')], 1085132115), ([Timestamp('2000-01-02 00:00:00...1-06 00:00:00')], 1541590477), ([Timestamp('2000-01-06 00:00:00'), Timestamp('2000-01-07 00:00:00')], 185559219), ...]]
kwargs = {}
DataFrameIOFunction = <class 'dask.dataframe.io.utils.DataFrameIOFunction'>
FromMap = <class 'dask_expr.io.io.FromMap'>
FromMapProjectable = <class 'dask_expr.io.io.FromMapProjectable'>
lengths = {30}, i = 0

    def from_map(
        func,
        *iterables,
        args=None,
        meta=no_default,
        divisions=None,
        label=None,
        enforce_metadata=False,
        **kwargs,
    ):
        """Create a dask-expr collection from a custom function map
    
        NOTE: The underlying ``Expr`` object produced by this API
        will support column projection (via ``simplify``) if
        the ``func`` argument has "columns" in its signature.
    
        """
        from dask.dataframe.io.utils import DataFrameIOFunction
    
        from dask_expr.io import FromMap, FromMapProjectable
    
        if "token" in kwargs:
            # This option doesn't really make sense in dask-expr
            raise NotImplementedError("dask_expr does not support a token argument.")
    
        lengths = set()
        iterables = list(iterables)
        for i, iterable in enumerate(iterables):
            if not isinstance(iterable, Iterable):
                raise ValueError(
                    f"All elements of `iterables` must be Iterable, got {type(iterable)}"
                )
            try:
                lengths.add(len(iterable))
            except (AttributeError, TypeError):
                iterables[i] = list(iterable)
                lengths.add(len(iterables[i]))
        if len(lengths) == 0:
            raise ValueError("`from_map` requires at least one Iterable input")
        elif len(lengths) > 1:
            raise ValueError("All `iterables` must have the same length")
        if lengths == {0}:
            raise ValueError("All `iterables` must have a non-zero length")
    
        # Check if `func` supports column projection
        allow_projection = False
        columns_arg_required = False
        if param := inspect.signature(func).parameters.get("columns", None):
            allow_projection = True
            columns_arg_required = param.default is param.empty
            if meta is no_default and columns_arg_required:
                raise TypeError(
                    "Argument `func` of `from_map` has a required `columns` "
                    " parameter and not `meta` provided."
                    "Either provide `meta` yourself or make `columns` an optional argument."
                )
        elif isinstance(func, DataFrameIOFunction):
>           warnings.warn(
                "dask_expr does not support the DataFrameIOFunction "
                "protocol for column projection. To enable column "
                "projection, please ensure that the signature of `func` "
                "includes a `columns=` keyword argument instead."
            )
E           UserWarning: dask_expr does not support the DataFrameIOFunction protocol for column projection. To enable column projection, please ensure that the signature of `func` includes a `columns=` keyword argument instead.

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:5015: UserWarning

Check warning on line 0 in distributed.tests.test_client

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 13 runs failed: test_futures_of_sorted (distributed.tests.test_client)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
UserWarning: dask_expr does not support the DataFrameIOFunction protocol for column projection. To enable column projection, please ensure that the signature of `func` includes a `columns=` keyword argument instead.
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:38891', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:45943', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:34335', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_futures_of_sorted(c, s, a, b):
        pytest.importorskip("dask.dataframe")
>       df = await dask.datasets.timeseries(dtypes={"x": int}).persist()

distributed/tests/test_client.py:6561: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/datasets.py:63: in timeseries
    return make_timeseries(
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/dataframe/io/demo.py:434: in make_timeseries
    return from_map(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

func = <dask.dataframe.io.demo.MakeDataframePart object at 0x7f6bfef25840>
args = None, meta =                x
timestamp       
2000-01-01  1056
divisions = [Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-02 00:00:00'), Timestamp('2000-01-03 00:00:00'), Timestamp('2000-01-04 00:00:00'), Timestamp('2000-01-05 00:00:00'), Timestamp('2000-01-06 00:00:00'), ...]
label = 'make-timeseries', enforce_metadata = False
iterables = [[([Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-02 00:00:00')], 324447394), ([Timestamp('2000-01-02 00:00:00'...-06 00:00:00')], 1946694151), ([Timestamp('2000-01-06 00:00:00'), Timestamp('2000-01-07 00:00:00')], 1698805141), ...]]
kwargs = {}
DataFrameIOFunction = <class 'dask.dataframe.io.utils.DataFrameIOFunction'>
FromMap = <class 'dask_expr.io.io.FromMap'>
FromMapProjectable = <class 'dask_expr.io.io.FromMapProjectable'>
lengths = {30}, i = 0

    def from_map(
        func,
        *iterables,
        args=None,
        meta=no_default,
        divisions=None,
        label=None,
        enforce_metadata=False,
        **kwargs,
    ):
        """Create a dask-expr collection from a custom function map
    
        NOTE: The underlying ``Expr`` object produced by this API
        will support column projection (via ``simplify``) if
        the ``func`` argument has "columns" in its signature.
    
        """
        from dask.dataframe.io.utils import DataFrameIOFunction
    
        from dask_expr.io import FromMap, FromMapProjectable
    
        if "token" in kwargs:
            # This option doesn't really make sense in dask-expr
            raise NotImplementedError("dask_expr does not support a token argument.")
    
        lengths = set()
        iterables = list(iterables)
        for i, iterable in enumerate(iterables):
            if not isinstance(iterable, Iterable):
                raise ValueError(
                    f"All elements of `iterables` must be Iterable, got {type(iterable)}"
                )
            try:
                lengths.add(len(iterable))
            except (AttributeError, TypeError):
                iterables[i] = list(iterable)
                lengths.add(len(iterables[i]))
        if len(lengths) == 0:
            raise ValueError("`from_map` requires at least one Iterable input")
        elif len(lengths) > 1:
            raise ValueError("All `iterables` must have the same length")
        if lengths == {0}:
            raise ValueError("All `iterables` must have a non-zero length")
    
        # Check if `func` supports column projection
        allow_projection = False
        columns_arg_required = False
        if param := inspect.signature(func).parameters.get("columns", None):
            allow_projection = True
            columns_arg_required = param.default is param.empty
            if meta is no_default and columns_arg_required:
                raise TypeError(
                    "Argument `func` of `from_map` has a required `columns` "
                    " parameter and not `meta` provided."
                    "Either provide `meta` yourself or make `columns` an optional argument."
                )
        elif isinstance(func, DataFrameIOFunction):
>           warnings.warn(
                "dask_expr does not support the DataFrameIOFunction "
                "protocol for column projection. To enable column "
                "projection, please ensure that the signature of `func` "
                "includes a `columns=` keyword argument instead."
            )
E           UserWarning: dask_expr does not support the DataFrameIOFunction protocol for column projection. To enable column projection, please ensure that the signature of `func` includes a `columns=` keyword argument instead.

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:5015: UserWarning

Check warning on line 0 in distributed.tests.test_scheduler

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 13 runs failed: test_default_task_duration_splits (distributed.tests.test_scheduler)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 4s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 2s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 2s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 3s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 4s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 4s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 3s]
Raw output
assert 0 == 1
 +  where 0 = len([])
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:43863', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:39479', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:38825', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_default_task_duration_splits(c, s, a, b):
        """Ensure that the default task durations for shuffle split tasks are, by default,
        aligned with the task names of dask.dask
        """
        pd = pytest.importorskip("pandas")
        dd = pytest.importorskip("dask.dataframe")
    
        # We don't care about the actual computation here but we'll schedule one anyhow to
        # verify that we're looking for the correct key
        npart = 10
        df = dd.from_pandas(pd.DataFrame({"A": range(100), "B": 1}), npartitions=npart)
        with dask.config.set({"dataframe.shuffle.method": "tasks"}):
            graph = df.shuffle(
                "A",
                # If we don't have enough partitions, we'll fall back to a simple shuffle
                max_branch=npart - 1,
            ).sum()
        fut = c.compute(graph)
        await wait(fut)
    
        split_prefix = [pre for pre in s.task_prefixes.keys() if "split" in pre]
>       assert len(split_prefix) == 1
E       assert 0 == 1
E        +  where 0 = len([])

distributed/tests/test_scheduler.py:2786: AssertionError

Check warning on line 0 in distributed.tests.test_scheduler

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 13 runs failed: test_tell_workers_when_peers_have_left (distributed.tests.test_scheduler)

artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 5s]
Raw output
assert 1709705861.645045 < (1709705856.62995 + 5)
 +  where 1709705861.645045 = time()
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:38211', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:44241', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:42719', name: 1, status: closed, stored: 1, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_tell_workers_when_peers_have_left(c, s, a, b):
        f = (await c.scatter({"f": 1}, workers=[a.address, b.address], broadcast=True))["f"]
    
        workers = {a.address: a, b.address: b}
        connect_timeout = parse_timedelta(
            dask.config.get("distributed.comm.timeouts.connect"), default="seconds"
        )
    
        class BrokenGatherDep(Worker):
            async def gather_dep(self, worker, *args, **kwargs):
                w = workers.pop(worker, None)
                if w is not None and workers:
                    w.listener.stop()
                    s.stream_comms[worker].abort()
    
                return await super().gather_dep(worker, *args, **kwargs)
    
        async with BrokenGatherDep(s.address, nthreads=1) as w3:
            start = time()
            g = await c.submit(inc, f, key="g", workers=[w3.address])
            # fails over to the second worker in less than the connect timeout
>           assert time() < start + connect_timeout
E           assert 1709705861.645045 < (1709705856.62995 + 5)
E            +  where 1709705861.645045 = time()

distributed/tests/test_scheduler.py:4624: AssertionError

Check warning on line 0 in distributed.tests.test_steal

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 13 runs failed: test_blocklist_shuffle_split (distributed.tests.test_steal)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 1s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 1s]
Raw output
assert set()
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:33631', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:32783', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:38277', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_blocklist_shuffle_split(c, s, a, b):
        pd = pytest.importorskip("pandas")
        dd = pytest.importorskip("dask.dataframe")
        npart = 10
        df = dd.from_pandas(pd.DataFrame({"A": range(100), "B": 1}), npartitions=npart)
        with dask.config.set({"dataframe.shuffle.method": "tasks"}):
            graph = df.shuffle(
                "A",
                # If we don't have enough partitions, we'll fall back to a simple shuffle
                max_branch=npart - 1,
            ).sum()
        res = c.compute(graph)
    
        while not s.tasks:
            await asyncio.sleep(0.005)
        prefixes = set(s.task_prefixes.keys())
        from distributed.stealing import fast_tasks
    
        blocked = fast_tasks & prefixes
>       assert blocked
E       assert set()

distributed/tests/test_steal.py:1034: AssertionError

Check warning on line 0 in distributed.tests.test_worker

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 13 runs failed: test_broken_comm (distributed.tests.test_worker)

artifacts/macos-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-ci1/pytest.xml [took 0s]
Raw output
UserWarning: dask_expr does not support the DataFrameIOFunction protocol for column projection. To enable column projection, please ensure that the signature of `func` includes a `columns=` keyword argument instead.
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:37901', workers: 0, cores: 0, tasks: 0>
a = <BreakingWorker 'tcp://127.0.0.1:34497', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <BreakingWorker 'tcp://127.0.0.1:45827', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @pytest.mark.slow
    @gen_cluster(client=True, Worker=BreakingWorker)
    async def test_broken_comm(c, s, a, b):
        pytest.importorskip("dask.dataframe")
    
>       df = dask.datasets.timeseries(
            start="2000-01-01",
            end="2000-01-10",
        )

distributed/tests/test_worker.py:3454: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/datasets.py:63: in timeseries
    return make_timeseries(
../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask/dataframe/io/demo.py:434: in make_timeseries
    return from_map(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

func = <dask.dataframe.io.demo.MakeDataframePart object at 0x7f6be9d95330>
args = None
meta =            name    id         x         y
timestamp                                
2000-01-01  Dan  1022  0.989642 -0.951422
divisions = [Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-02 00:00:00'), Timestamp('2000-01-03 00:00:00'), Timestamp('2000-01-04 00:00:00'), Timestamp('2000-01-05 00:00:00'), Timestamp('2000-01-06 00:00:00'), ...]
label = 'make-timeseries', enforce_metadata = False
iterables = [[([Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-02 00:00:00')], 1412395655), ([Timestamp('2000-01-02 00:00:00...1-06 00:00:00')], 158682245), ([Timestamp('2000-01-06 00:00:00'), Timestamp('2000-01-07 00:00:00')], 1629624879), ...]]
kwargs = {}
DataFrameIOFunction = <class 'dask.dataframe.io.utils.DataFrameIOFunction'>
FromMap = <class 'dask_expr.io.io.FromMap'>
FromMapProjectable = <class 'dask_expr.io.io.FromMapProjectable'>, lengths = {9}
i = 0

    def from_map(
        func,
        *iterables,
        args=None,
        meta=no_default,
        divisions=None,
        label=None,
        enforce_metadata=False,
        **kwargs,
    ):
        """Create a dask-expr collection from a custom function map
    
        NOTE: The underlying ``Expr`` object produced by this API
        will support column projection (via ``simplify``) if
        the ``func`` argument has "columns" in its signature.
    
        """
        from dask.dataframe.io.utils import DataFrameIOFunction
    
        from dask_expr.io import FromMap, FromMapProjectable
    
        if "token" in kwargs:
            # This option doesn't really make sense in dask-expr
            raise NotImplementedError("dask_expr does not support a token argument.")
    
        lengths = set()
        iterables = list(iterables)
        for i, iterable in enumerate(iterables):
            if not isinstance(iterable, Iterable):
                raise ValueError(
                    f"All elements of `iterables` must be Iterable, got {type(iterable)}"
                )
            try:
                lengths.add(len(iterable))
            except (AttributeError, TypeError):
                iterables[i] = list(iterable)
                lengths.add(len(iterables[i]))
        if len(lengths) == 0:
            raise ValueError("`from_map` requires at least one Iterable input")
        elif len(lengths) > 1:
            raise ValueError("All `iterables` must have the same length")
        if lengths == {0}:
            raise ValueError("All `iterables` must have a non-zero length")
    
        # Check if `func` supports column projection
        allow_projection = False
        columns_arg_required = False
        if param := inspect.signature(func).parameters.get("columns", None):
            allow_projection = True
            columns_arg_required = param.default is param.empty
            if meta is no_default and columns_arg_required:
                raise TypeError(
                    "Argument `func` of `from_map` has a required `columns` "
                    " parameter and not `meta` provided."
                    "Either provide `meta` yourself or make `columns` an optional argument."
                )
        elif isinstance(func, DataFrameIOFunction):
>           warnings.warn(
                "dask_expr does not support the DataFrameIOFunction "
                "protocol for column projection. To enable column "
                "projection, please ensure that the signature of `func` "
                "includes a `columns=` keyword argument instead."
            )
E           UserWarning: dask_expr does not support the DataFrameIOFunction protocol for column projection. To enable column projection, please ensure that the signature of `func` includes a `columns=` keyword argument instead.

../../../miniconda3/envs/dask-distributed/lib/python3.10/site-packages/dask_expr/_collection.py:5015: UserWarning

Check warning on line 0 in distributed.shuffle.tests.test_metrics

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

10 out of 12 runs failed: test_dataframe (distributed.shuffle.tests.test_metrics)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 0s]
Raw output
UserWarning: dask_expr does not support the DataFrameIOFunction protocol for column projection. To enable column projection, please ensure that the signature of `func` includes a `columns=` keyword argument instead.
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:57177', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:57178', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:57181', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_dataframe(c, s, a, b):
        """Metrics are *almost* agnostic in dataframe shuffle vs. array rechunk.
        The only exception is the 'p2p-shards' metric, which is implemented separately.
        """
        dd = pytest.importorskip("dask.dataframe")
    
>       df = dask.datasets.timeseries(
            start="2000-01-01",
            end="2000-01-10",
            dtypes={"x": float, "y": float},
            freq="10 s",
        )

distributed/shuffle/tests/test_metrics.py:75: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/datasets.py:63: in timeseries
    return make_timeseries(
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/dataframe/io/demo.py:434: in make_timeseries
    return from_map(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

func = <dask.dataframe.io.demo.MakeDataframePart object at 0x14cccd670>
args = None
meta =                    x         y
timestamp                     
2000-01-01  0.625845 -0.046717
divisions = [Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-02 00:00:00'), Timestamp('2000-01-03 00:00:00'), Timestamp('2000-01-04 00:00:00'), Timestamp('2000-01-05 00:00:00'), Timestamp('2000-01-06 00:00:00'), ...]
label = 'make-timeseries', enforce_metadata = False
iterables = [[([Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-02 00:00:00')], 1122137641), ([Timestamp('2000-01-02 00:00:00...-06 00:00:00')], 1724851991), ([Timestamp('2000-01-06 00:00:00'), Timestamp('2000-01-07 00:00:00')], 1821482755), ...]]
kwargs = {}
DataFrameIOFunction = <class 'dask.dataframe.io.utils.DataFrameIOFunction'>
FromMap = <class 'dask_expr.io.io.FromMap'>
FromMapProjectable = <class 'dask_expr.io.io.FromMapProjectable'>, lengths = {9}
i = 0

    def from_map(
        func,
        *iterables,
        args=None,
        meta=no_default,
        divisions=None,
        label=None,
        enforce_metadata=False,
        **kwargs,
    ):
        """Create a dask-expr collection from a custom function map
    
        NOTE: The underlying ``Expr`` object produced by this API
        will support column projection (via ``simplify``) if
        the ``func`` argument has "columns" in its signature.
    
        """
        from dask.dataframe.io.utils import DataFrameIOFunction
    
        from dask_expr.io import FromMap, FromMapProjectable
    
        if "token" in kwargs:
            # This option doesn't really make sense in dask-expr
            raise NotImplementedError("dask_expr does not support a token argument.")
    
        lengths = set()
        iterables = list(iterables)
        for i, iterable in enumerate(iterables):
            if not isinstance(iterable, Iterable):
                raise ValueError(
                    f"All elements of `iterables` must be Iterable, got {type(iterable)}"
                )
            try:
                lengths.add(len(iterable))
            except (AttributeError, TypeError):
                iterables[i] = list(iterable)
                lengths.add(len(iterables[i]))
        if len(lengths) == 0:
            raise ValueError("`from_map` requires at least one Iterable input")
        elif len(lengths) > 1:
            raise ValueError("All `iterables` must have the same length")
        if lengths == {0}:
            raise ValueError("All `iterables` must have a non-zero length")
    
        # Check if `func` supports column projection
        allow_projection = False
        columns_arg_required = False
        if param := inspect.signature(func).parameters.get("columns", None):
            allow_projection = True
            columns_arg_required = param.default is param.empty
            if meta is no_default and columns_arg_required:
                raise TypeError(
                    "Argument `func` of `from_map` has a required `columns` "
                    " parameter and not `meta` provided."
                    "Either provide `meta` yourself or make `columns` an optional argument."
                )
        elif isinstance(func, DataFrameIOFunction):
>           warnings.warn(
                "dask_expr does not support the DataFrameIOFunction "
                "protocol for column projection. To enable column "
                "projection, please ensure that the signature of `func` "
                "includes a `columns=` keyword argument instead."
            )
E           UserWarning: dask_expr does not support the DataFrameIOFunction protocol for column projection. To enable column projection, please ensure that the signature of `func` includes a `columns=` keyword argument instead.

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:5015: UserWarning

Check warning on line 0 in distributed.tests.test_cancelled_state

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

1 out of 13 runs failed: test_deadlock_cancelled_after_inflight_before_gather_from_worker[True-cancelled] (distributed.tests.test_cancelled_state)

artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 30s]
Raw output
TimeoutError: Test timeout (30) hit after 29.994175910949707s.
========== Test stack trace starts here ==========
Stack for <Task pending name='Task-193733' coro=<test_deadlock_cancelled_after_inflight_before_gather_from_worker() running at D:\a\distributed\distributed\distributed\tests\test_cancelled_state.py:861> wait_for=<Future pending cb=[Task.task_wakeup()]>> (most recent call last):
  File "D:\a\distributed\distributed\distributed\tests\test_cancelled_state.py", line 861, in test_deadlock_cancelled_after_inflight_before_gather_from_worker
    await fut3
args = (), kwds = {'close_worker': True, 'intermediate_state': 'cancelled'}

    @wraps(func)
    def inner(*args, **kwds):
        with self._recreate_cm():
>           return func(*args, **kwds)

C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\contextlib.py:81: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\contextlib.py:81: in inner
    return func(*args, **kwds)
distributed\utils_test.py:1102: in test_func
    return _run_and_close_tornado(async_fn_outer)
distributed\utils_test.py:378: in _run_and_close_tornado
    return asyncio_run(inner_fn(), loop_factory=get_loop_factory())
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\asyncio\runners.py:194: in run
    return runner.run(main)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\asyncio\runners.py:118: in run
    return self._loop.run_until_complete(task)
C:\Users\runneradmin\miniconda3\envs\dask-distributed\Lib\asyncio\base_events.py:685: in run_until_complete
    return future.result()
distributed\utils_test.py:375: in inner_fn
    return await async_fn(*args, **kwargs)
distributed\utils_test.py:1099: in async_fn_outer
    return await utils_wait_for(async_fn(), timeout=timeout * 2)
distributed\utils.py:1935: in wait_for
    return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

    async def async_fn():
        result = None
        with dask.config.set(config):
            async with (
                _cluster_factory() as (s, workers),
                _client_factory(s) as c,
            ):
                args = [s] + workers
                if c is not None:
                    args = [c] + args
                try:
                    coro = func(*args, *outer_args, **kwargs)
                    task = asyncio.create_task(coro)
                    coro2 = utils_wait_for(
                        asyncio.shield(task), timeout=deadline.remaining
                    )
                    result = await coro2
                    validate_state(s, *workers)
    
                except asyncio.TimeoutError:
                    assert task
                    elapsed = deadline.elapsed
                    buffer = io.StringIO()
                    # This stack indicates where the coro/test is suspended
                    task.print_stack(file=buffer)
    
                    if cluster_dump_directory:
                        await dump_cluster_state(
                            s=s,
                            ws=workers,
                            output_dir=cluster_dump_directory,
                            func_name=func.__name__,
                        )
    
                    task.cancel()
                    while not task.cancelled():
                        await asyncio.sleep(0.01)
    
                    # Hopefully, the hang has been caused by inconsistent
                    # state, which should be much more meaningful than the
                    # timeout
                    validate_state(s, *workers)
    
                    # Remove as much of the traceback as possible; it's
                    # uninteresting boilerplate from utils_test and asyncio
                    # and not from the code being tested.
>                   raise asyncio.TimeoutError(
                        f"Test timeout ({timeout}) hit after {elapsed}s.\n"
                        "========== Test stack trace starts here ==========\n"
                        f"{buffer.getvalue()}"
                    ) from None
E                   TimeoutError: Test timeout (30) hit after 29.994175910949707s.
E                   ========== Test stack trace starts here ==========
E                   Stack for <Task pending name='Task-193733' coro=<test_deadlock_cancelled_after_inflight_before_gather_from_worker() running at D:\a\distributed\distributed\distributed\tests\test_cancelled_state.py:861> wait_for=<Future pending cb=[Task.task_wakeup()]>> (most recent call last):
E                     File "D:\a\distributed\distributed\distributed\tests\test_cancelled_state.py", line 861, in test_deadlock_cancelled_after_inflight_before_gather_from_worker
E                       await fut3

distributed\utils_test.py:1041: TimeoutError

Check warning on line 0 in distributed.dashboard.tests.test_scheduler_bokeh

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 10 runs failed: test_shuffling (distributed.dashboard.tests.test_scheduler_bokeh)

artifacts/macos-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-notci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-queue-notci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-queue-notci1/pytest.xml [took 0s]
Raw output
UserWarning: dask_expr does not support the DataFrameIOFunction protocol for column projection. To enable column projection, please ensure that the signature of `func` includes a `columns=` keyword argument instead.
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:51925', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:51926', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:51929', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True, scheduler_kwargs={"dashboard": True})
    async def test_shuffling(c, s, a, b):
        pytest.importorskip("pyarrow")
        dd = pytest.importorskip("dask.dataframe")
        ss = Shuffling(s)
    
>       df = dask.datasets.timeseries(
            start="2000-01-01",
            end="2000-02-01",
            dtypes={"x": float, "y": float},
            freq="10 s",
        )

distributed/dashboard/tests/test_scheduler_bokeh.py:1342: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/datasets.py:63: in timeseries
    return make_timeseries(
../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask/dataframe/io/demo.py:434: in make_timeseries
    return from_map(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

func = <dask.dataframe.io.demo.MakeDataframePart object at 0x148542ab0>
args = None
meta =                    x         y
timestamp                     
2000-01-01  0.731475 -0.960988
divisions = [Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-02 00:00:00'), Timestamp('2000-01-03 00:00:00'), Timestamp('2000-01-04 00:00:00'), Timestamp('2000-01-05 00:00:00'), Timestamp('2000-01-06 00:00:00'), ...]
label = 'make-timeseries', enforce_metadata = False
iterables = [[([Timestamp('2000-01-01 00:00:00'), Timestamp('2000-01-02 00:00:00')], 990018238), ([Timestamp('2000-01-02 00:00:00'...1-06 00:00:00')], 846223100), ([Timestamp('2000-01-06 00:00:00'), Timestamp('2000-01-07 00:00:00')], 1097343279), ...]]
kwargs = {}
DataFrameIOFunction = <class 'dask.dataframe.io.utils.DataFrameIOFunction'>
FromMap = <class 'dask_expr.io.io.FromMap'>
FromMapProjectable = <class 'dask_expr.io.io.FromMapProjectable'>
lengths = {31}, i = 0

    def from_map(
        func,
        *iterables,
        args=None,
        meta=no_default,
        divisions=None,
        label=None,
        enforce_metadata=False,
        **kwargs,
    ):
        """Create a dask-expr collection from a custom function map
    
        NOTE: The underlying ``Expr`` object produced by this API
        will support column projection (via ``simplify``) if
        the ``func`` argument has "columns" in its signature.
    
        """
        from dask.dataframe.io.utils import DataFrameIOFunction
    
        from dask_expr.io import FromMap, FromMapProjectable
    
        if "token" in kwargs:
            # This option doesn't really make sense in dask-expr
            raise NotImplementedError("dask_expr does not support a token argument.")
    
        lengths = set()
        iterables = list(iterables)
        for i, iterable in enumerate(iterables):
            if not isinstance(iterable, Iterable):
                raise ValueError(
                    f"All elements of `iterables` must be Iterable, got {type(iterable)}"
                )
            try:
                lengths.add(len(iterable))
            except (AttributeError, TypeError):
                iterables[i] = list(iterable)
                lengths.add(len(iterables[i]))
        if len(lengths) == 0:
            raise ValueError("`from_map` requires at least one Iterable input")
        elif len(lengths) > 1:
            raise ValueError("All `iterables` must have the same length")
        if lengths == {0}:
            raise ValueError("All `iterables` must have a non-zero length")
    
        # Check if `func` supports column projection
        allow_projection = False
        columns_arg_required = False
        if param := inspect.signature(func).parameters.get("columns", None):
            allow_projection = True
            columns_arg_required = param.default is param.empty
            if meta is no_default and columns_arg_required:
                raise TypeError(
                    "Argument `func` of `from_map` has a required `columns` "
                    " parameter and not `meta` provided."
                    "Either provide `meta` yourself or make `columns` an optional argument."
                )
        elif isinstance(func, DataFrameIOFunction):
>           warnings.warn(
                "dask_expr does not support the DataFrameIOFunction "
                "protocol for column projection. To enable column "
                "projection, please ensure that the signature of `func` "
                "includes a `columns=` keyword argument instead."
            )
E           UserWarning: dask_expr does not support the DataFrameIOFunction protocol for column projection. To enable column projection, please ensure that the signature of `func` includes a `columns=` keyword argument instead.

../../../miniconda3/envs/dask-distributed/lib/python3.12/site-packages/dask_expr/_collection.py:5015: UserWarning