Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ jobs:
# Increase this value to reset cache if
# continuous_integration/environment-${{ matrix.environment }}.yaml has not
# changed. See also same variable in .pre-commit-config.yaml
CACHE_NUMBER: 0
CACHE_NUMBER: 2
id: cache

- name: Update environment
Expand Down
5 changes: 0 additions & 5 deletions distributed/protocol/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@

from distributed.protocol.serialize import dask_deserialize, dask_serialize

if int(pyarrow.__version__.split(".")[0]) < 16:
raise ImportError(
"Need pyarrow >=16.0. See https://arrow.apache.org/docs/python/install.html"
)


@dask_serialize.register(pyarrow.RecordBatch)
def serialize_batch(batch):
Expand Down
2 changes: 0 additions & 2 deletions distributed/shuffle/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
from __future__ import annotations

from distributed.shuffle._arrow import check_minimal_arrow_version
from distributed.shuffle._rechunk import rechunk_p2p
from distributed.shuffle._scheduler_plugin import ShuffleSchedulerPlugin
from distributed.shuffle._worker_plugin import ShuffleWorkerPlugin

__all__ = [
"check_minimal_arrow_version",
"rechunk_p2p",
"ShuffleSchedulerPlugin",
"ShuffleWorkerPlugin",
Expand Down
20 changes: 0 additions & 20 deletions distributed/shuffle/_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
from pathlib import Path
from typing import TYPE_CHECKING

from packaging.version import parse

from dask.utils import parse_bytes

if TYPE_CHECKING:
Expand All @@ -31,24 +29,6 @@ def check_dtype_support(meta_input: pd.DataFrame) -> None:
raise TypeError("p2p does not support sparse data found in column '{name}'")


def check_minimal_arrow_version() -> None:
"""Verify that the the correct version of pyarrow is installed to support
the P2P extension.

Raises a ModuleNotFoundError if pyarrow is not installed or an
ImportError if the installed version is not recent enough.
"""
minversion = "14.0.1"
try:
import pyarrow as pa
except ModuleNotFoundError:
raise ModuleNotFoundError(f"P2P shuffling requires pyarrow>={minversion}")
if parse(pa.__version__) < parse(minversion):
raise ImportError(
f"P2P shuffling requires pyarrow>={minversion} but only found {pa.__version__}"
)


def concat_tables(tables: Iterable[pa.Table]) -> pa.Table:
import pyarrow as pa

Expand Down
12 changes: 5 additions & 7 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
from distributed.diagnostics.plugin import UploadDirectory, WorkerPlugin
from distributed.metrics import time
from distributed.scheduler import CollectTaskMetaDataPlugin, KilledWorker, Scheduler
from distributed.shuffle import check_minimal_arrow_version
from distributed.sizeof import sizeof
from distributed.utils import get_mp_context, is_valid_xml, open_port, sync, tmp_text
from distributed.utils_test import (
Expand Down Expand Up @@ -3417,17 +3416,16 @@ async def test_cancel_clears_processing(c, s, *workers):


def test_default_get(loop_in_thread):
has_pyarrow = False
try:
check_minimal_arrow_version()
has_pyarrow = True
except ImportError:
pass
from dask.dataframe._compat import HAS_PYARROW
except ImportError: # No pandas
HAS_PYARROW = False

loop = loop_in_thread
with cluster() as (s, [a, b]):
pre_get = dask.base.get_scheduler()
# These may change in the future but the selection below should not
distributed_default = "p2p" if has_pyarrow else "tasks"
distributed_default = "p2p" if HAS_PYARROW else "tasks"
local_default = "disk"
assert get_default_shuffle_method() == local_default
with Client(s["address"], set_as_default=True, loop=loop) as c:
Expand Down
Loading