diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 866d75858a..079729059f 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -155,7 +155,7 @@ jobs: # Increase this value to reset cache if # continuous_integration/environment-${{ matrix.environment }}.yaml has not # changed. See also same variable in .pre-commit-config.yaml - CACHE_NUMBER: 0 + CACHE_NUMBER: 2 id: cache - name: Update environment diff --git a/distributed/protocol/arrow.py b/distributed/protocol/arrow.py index dfa98fdcda..2f43c1cf9f 100644 --- a/distributed/protocol/arrow.py +++ b/distributed/protocol/arrow.py @@ -4,11 +4,6 @@ from distributed.protocol.serialize import dask_deserialize, dask_serialize -if int(pyarrow.__version__.split(".")[0]) < 16: - raise ImportError( - "Need pyarrow >=16.0. See https://arrow.apache.org/docs/python/install.html" - ) - @dask_serialize.register(pyarrow.RecordBatch) def serialize_batch(batch): diff --git a/distributed/shuffle/__init__.py b/distributed/shuffle/__init__.py index a2169105db..6ad3b8a9e5 100644 --- a/distributed/shuffle/__init__.py +++ b/distributed/shuffle/__init__.py @@ -1,12 +1,10 @@ from __future__ import annotations -from distributed.shuffle._arrow import check_minimal_arrow_version from distributed.shuffle._rechunk import rechunk_p2p from distributed.shuffle._scheduler_plugin import ShuffleSchedulerPlugin from distributed.shuffle._worker_plugin import ShuffleWorkerPlugin __all__ = [ - "check_minimal_arrow_version", "rechunk_p2p", "ShuffleSchedulerPlugin", "ShuffleWorkerPlugin", diff --git a/distributed/shuffle/_arrow.py b/distributed/shuffle/_arrow.py index 973e3414be..93cd8deccd 100644 --- a/distributed/shuffle/_arrow.py +++ b/distributed/shuffle/_arrow.py @@ -4,8 +4,6 @@ from pathlib import Path from typing import TYPE_CHECKING -from packaging.version import parse - from dask.utils import parse_bytes if TYPE_CHECKING: @@ -31,24 +29,6 @@ def check_dtype_support(meta_input: pd.DataFrame) -> None: raise TypeError("p2p does not support sparse data found in column '{name}'") -def check_minimal_arrow_version() -> None: - """Verify that the the correct version of pyarrow is installed to support - the P2P extension. - - Raises a ModuleNotFoundError if pyarrow is not installed or an - ImportError if the installed version is not recent enough. - """ - minversion = "14.0.1" - try: - import pyarrow as pa - except ModuleNotFoundError: - raise ModuleNotFoundError(f"P2P shuffling requires pyarrow>={minversion}") - if parse(pa.__version__) < parse(minversion): - raise ImportError( - f"P2P shuffling requires pyarrow>={minversion} but only found {pa.__version__}" - ) - - def concat_tables(tables: Iterable[pa.Table]) -> pa.Table: import pyarrow as pa diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index c00b742a9f..88e3c3200a 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -86,7 +86,6 @@ from distributed.diagnostics.plugin import UploadDirectory, WorkerPlugin from distributed.metrics import time from distributed.scheduler import CollectTaskMetaDataPlugin, KilledWorker, Scheduler -from distributed.shuffle import check_minimal_arrow_version from distributed.sizeof import sizeof from distributed.utils import get_mp_context, is_valid_xml, open_port, sync, tmp_text from distributed.utils_test import ( @@ -3417,17 +3416,16 @@ async def test_cancel_clears_processing(c, s, *workers): def test_default_get(loop_in_thread): - has_pyarrow = False try: - check_minimal_arrow_version() - has_pyarrow = True - except ImportError: - pass + from dask.dataframe._compat import HAS_PYARROW + except ImportError: # No pandas + HAS_PYARROW = False + loop = loop_in_thread with cluster() as (s, [a, b]): pre_get = dask.base.get_scheduler() # These may change in the future but the selection below should not - distributed_default = "p2p" if has_pyarrow else "tasks" + distributed_default = "p2p" if HAS_PYARROW else "tasks" local_default = "disk" assert get_default_shuffle_method() == local_default with Client(s["address"], set_as_default=True, loop=loop) as c: