From ad373f690dba495b121a07afbf0a7fddcdb2d38e Mon Sep 17 00:00:00 2001 From: Ian Rose Date: Thu, 5 May 2022 13:07:49 -0500 Subject: [PATCH 01/14] Handle ModuleNotFoundError when importing multiprocessing. --- dask/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask/base.py b/dask/base.py index 6631a5cec99..aae7321dd0d 100644 --- a/dask/base.py +++ b/dask/base.py @@ -1294,7 +1294,7 @@ def _colorize(t): try: from dask import multiprocessing as dask_multiprocessing -except ImportError: +except (ImportError, ModuleNotFoundError): pass else: named_schedulers.update( From 93fc7fe6b6a20f315906834a1bdff1d65364a481 Mon Sep 17 00:00:00 2001 From: Ian Rose Date: Thu, 5 May 2022 14:02:21 -0500 Subject: [PATCH 02/14] Be more defensive about multiprocessing and threading --- dask/array/core.py | 13 +++++++++++-- dask/bag/core.py | 14 +++++++++++--- dask/base.py | 16 +++++++++++++--- dask/dataframe/core.py | 15 ++++++++++++--- dask/dataframe/io/hdf.py | 11 +++++++++-- dask/delayed.py | 14 ++++++++++++-- 6 files changed, 68 insertions(+), 15 deletions(-) diff --git a/dask/array/core.py b/dask/array/core.py index ddabc9449e4..80352444481 100644 --- a/dask/array/core.py +++ b/dask/array/core.py @@ -31,7 +31,7 @@ from tlz import accumulate, concat, first, frequencies, groupby, partition from tlz.curried import pluck -from dask import compute, config, core, threaded +from dask import compute, config, core from dask.array import chunk from dask.array.chunk import getitem from dask.array.chunk_types import is_valid_array_chunk, is_valid_chunk_type @@ -84,6 +84,15 @@ ) from dask.widgets import get_template +try: + from dask import threaded +except (ImportError, ModuleNotFoundError): + from dask import local + + DEFAULT_GET = local.get_sync +else: + DEFAULT_GET = threaded.get + config.update_defaults({"array": {"chunk-size": "128MiB", "rechunk-threshold": 4}}) unknown_chunk_message = ( @@ -1406,7 +1415,7 @@ def __dask_tokenize__(self): __dask_optimize__ = globalmethod( optimize, key="array_optimize", falsey=dont_optimize ) - __dask_scheduler__ = staticmethod(threaded.get) + __dask_scheduler__ = staticmethod(DEFAULT_GET) def __dask_postcompute__(self): return finalize, () diff --git a/dask/bag/core.py b/dask/bag/core.py index 456fda4fa75..3b00f7f43f9 100644 --- a/dask/bag/core.py +++ b/dask/bag/core.py @@ -46,7 +46,6 @@ from dask.core import flatten, get_dependencies, istask, quote, reverse_dict from dask.delayed import Delayed, unpack_collections from dask.highlevelgraph import HighLevelGraph -from dask.multiprocessing import get as mpget from dask.optimization import cull, fuse, inline from dask.sizeof import sizeof from dask.utils import ( @@ -64,6 +63,15 @@ takes_multiple_arguments, ) +try: + from dask import multiprocessing +except (ImportError, ModuleNotFoundError): + from dask import local + + DEFAULT_GET = local.get_sync +else: + DEFAULT_GET = multiprocessing.get + no_default = "__no__default__" no_result = type( "no_result", (object,), {"__slots__": (), "__reduce__": lambda self: "no_result"} @@ -371,7 +379,7 @@ def __dask_tokenize__(self): return self.key __dask_optimize__ = globalmethod(optimize, key="bag_optimize", falsey=dont_optimize) - __dask_scheduler__ = staticmethod(mpget) + __dask_scheduler__ = staticmethod(DEFAULT_GET) def __dask_postcompute__(self): return finalize_item, () @@ -481,7 +489,7 @@ def __dask_tokenize__(self): return self.name __dask_optimize__ = globalmethod(optimize, key="bag_optimize", falsey=dont_optimize) - __dask_scheduler__ = staticmethod(mpget) + __dask_scheduler__ = staticmethod(DEFAULT_GET) def __dask_postcompute__(self): return finalize, () diff --git a/dask/base.py b/dask/base.py index aae7321dd0d..c2b535db071 100644 --- a/dask/base.py +++ b/dask/base.py @@ -22,7 +22,7 @@ from tlz import curry, groupby, identity, merge from tlz.functoolz import Compose -from dask import config, local, threaded +from dask import config, local from dask.compatibility import _PY_VERSION from dask.context import thread_state from dask.core import flatten @@ -1288,10 +1288,20 @@ def _colorize(t): "sync": local.get_sync, "synchronous": local.get_sync, "single-threaded": local.get_sync, - "threads": threaded.get, - "threading": threaded.get, } +try: + from dask import threaded +except (ImportError, ModuleNotFoundError): + pass +else: + named_schedulers.update( + { + "threads": threaded.get, + "threading": threaded.get, + } + ) + try: from dask import multiprocessing as dask_multiprocessing except (ImportError, ModuleNotFoundError): diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py index 23e7674c431..d96eba1b0d2 100644 --- a/dask/dataframe/core.py +++ b/dask/dataframe/core.py @@ -20,7 +20,7 @@ from tlz import first, merge, partition_all, remove, unique import dask.array as da -from dask import core, threaded +from dask import core from dask.array.core import Array, normalize_arg from dask.bag import map_partitions as map_bag_partitions from dask.base import DaskMethodsMixin, dont_optimize, is_dask_collection, tokenize @@ -79,6 +79,15 @@ ) from dask.widgets import get_template +try: + from dask import threaded +except (ImportError, ModuleNotFoundError): + from dask import local + + DEFAULT_GET = local.get_sync +else: + DEFAULT_GET = threaded.get + no_default = "__no_default__" GROUP_KEYS_DEFAULT = None if PANDAS_GT_150 else True @@ -163,7 +172,7 @@ def __dask_layers__(self): __dask_optimize__ = globalmethod( optimize, key="dataframe_optimize", falsey=dont_optimize ) - __dask_scheduler__ = staticmethod(threaded.get) + __dask_scheduler__ = staticmethod(DEFAULT_GET) def __dask_postcompute__(self): return first, () @@ -345,7 +354,7 @@ def __dask_tokenize__(self): __dask_optimize__ = globalmethod( optimize, key="dataframe_optimize", falsey=dont_optimize ) - __dask_scheduler__ = staticmethod(threaded.get) + __dask_scheduler__ = staticmethod(DEFAULT_GET) def __dask_postcompute__(self): return finalize, () diff --git a/dask/dataframe/io/hdf.py b/dask/dataframe/io/hdf.py index 661939f00b1..7622cab758e 100644 --- a/dask/dataframe/io/hdf.py +++ b/dask/dataframe/io/hdf.py @@ -8,7 +8,7 @@ from fsspec.utils import build_name_function, stringify_path from tlz import merge -from dask import config, multiprocessing +from dask import config from dask.base import compute_as_if_collection, get_scheduler, tokenize from dask.dataframe.core import DataFrame from dask.dataframe.io.io import _link, from_map @@ -16,6 +16,13 @@ from dask.delayed import Delayed, delayed from dask.utils import get_scheduler_lock +try: + from dask import multiprocessing +except (ImportError, ModuleNotFoundError): + mpget = object() +else: + mpget = multiprocessing.get + def _pd_to_hdf(pd_to_hdf, lock, args, kwargs=None): """A wrapper function around pd_to_hdf that enables locking""" @@ -193,7 +200,7 @@ def to_hdf( if lock is None: if not single_node: lock = True - elif not single_file and _actual_get is not multiprocessing.get: + elif not single_file and _actual_get is not mpget: # if we're writing to multiple files with the multiprocessing # scheduler we don't need to lock lock = True diff --git a/dask/delayed.py b/dask/delayed.py index c83bf783808..36f3f3bd900 100644 --- a/dask/delayed.py +++ b/dask/delayed.py @@ -7,7 +7,7 @@ from tlz import concat, curry, merge, unique -from dask import config, threaded +from dask import config from dask.base import ( DaskMethodsMixin, dont_optimize, @@ -23,6 +23,16 @@ __all__ = ["Delayed", "delayed"] +try: + from dask import threaded +except (ImportError, ModuleNotFoundError): + from dask import local + + DEFAULT_GET = local.get_sync +else: + DEFAULT_GET = threaded.get + + def unzip(ls, nout): """Unzip a list of lists into ``nout`` outputs.""" out = list(zip(*ls)) @@ -518,7 +528,7 @@ def __dask_layers__(self): def __dask_tokenize__(self): return self.key - __dask_scheduler__ = staticmethod(threaded.get) + __dask_scheduler__ = staticmethod(DEFAULT_GET) __dask_optimize__ = globalmethod(optimize, key="delayed_optimize") def __dask_postcompute__(self): From e49374f60061595315d18d1e549b342bbe8da4fa Mon Sep 17 00:00:00 2001 From: Ian Rose Date: Fri, 6 May 2022 15:35:40 -0500 Subject: [PATCH 03/14] Add extras for a pyodide install. Just jinja2 right now, but probably ipycytoscape later. --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index cbe669e8ce1..1f671d146a7 100755 --- a/setup.py +++ b/setup.py @@ -21,6 +21,7 @@ "jinja2", ], "delayed": [], # keeping for backwards compatibility + "pyodide": ["jinja2"], } extras_require["complete"] = sorted({v for req in extras_require.values() for v in req}) # after complete is set, add in test From ff6e8fec27a31ea041da87f6ed9e15819465c1e6 Mon Sep 17 00:00:00 2001 From: Ian Rose Date: Tue, 10 May 2022 15:28:21 -0700 Subject: [PATCH 04/14] Positional-only args for get functions. They were already inconsistent with regards to argument name, this just enforces it and allows for type checking of getters. --- dask/base.py | 3 ++- dask/local.py | 5 ++++- dask/multiprocessing.py | 6 ++++-- dask/threaded.py | 11 ++++++++++- dask/typing.py | 3 ++- 5 files changed, 22 insertions(+), 6 deletions(-) diff --git a/dask/base.py b/dask/base.py index c2b535db071..7d2b16138b1 100644 --- a/dask/base.py +++ b/dask/base.py @@ -30,6 +30,7 @@ from dask.core import literal, quote from dask.hashing import hash_buffer_hex from dask.system import CPU_COUNT +from dask.typing import SchedulerGetCallable from dask.utils import Dispatch, apply, ensure_dict, key_split __all__ = ( @@ -1284,7 +1285,7 @@ def _colorize(t): return "#" + h -named_schedulers = { +named_schedulers: dict[str, SchedulerGetCallable] = { "sync": local.get_sync, "synchronous": local.get_sync, "single-threaded": local.get_sync, diff --git a/dask/local.py b/dask/local.py index 24b45b049ae..8580b38d513 100644 --- a/dask/local.py +++ b/dask/local.py @@ -106,7 +106,10 @@ See the function ``inline_functions`` for more information. """ +from __future__ import annotations + import os +from collections.abc import Hashable, Mapping, Sequence from concurrent.futures import Executor, Future from functools import partial from queue import Empty, Queue @@ -545,7 +548,7 @@ def submit(self, fn, *args, **kwargs): synchronous_executor = SynchronousExecutor() -def get_sync(dsk, keys, **kwargs): +def get_sync(dsk: Mapping, keys: Sequence[Hashable] | Hashable, /, **kwargs): """A naive synchronous version of get_async Can be useful for debugging. diff --git a/dask/multiprocessing.py b/dask/multiprocessing.py index eaf9dbf5181..b6c9d418b99 100644 --- a/dask/multiprocessing.py +++ b/dask/multiprocessing.py @@ -7,6 +7,7 @@ import pickle import sys import traceback +from collections.abc import Hashable, Mapping, Sequence from concurrent.futures import ProcessPoolExecutor from functools import partial from warnings import warn @@ -143,8 +144,9 @@ def get_context(): def get( - dsk, - keys, + dsk: Mapping, + keys: Sequence[Hashable] | Hashable, + /, num_workers=None, func_loads=None, func_dumps=None, diff --git a/dask/threaded.py b/dask/threaded.py index 96bf1f37c59..c2951b66b3e 100644 --- a/dask/threaded.py +++ b/dask/threaded.py @@ -10,6 +10,7 @@ import sys import threading from collections import defaultdict +from collections.abc import Hashable, Mapping, Sequence from concurrent.futures import Executor, ThreadPoolExecutor from threading import Lock, current_thread @@ -32,7 +33,15 @@ def pack_exception(e, dumps): return e, sys.exc_info()[2] -def get(dsk, result, cache=None, num_workers=None, pool=None, **kwargs): +def get( + dsk: Mapping, + result: Sequence[Hashable] | Hashable, + /, + cache=None, + num_workers=None, + pool=None, + **kwargs, +): """Threaded cached implementation of dask.get Parameters diff --git a/dask/typing.py b/dask/typing.py index f8c2d26dc07..c29ad384085 100644 --- a/dask/typing.py +++ b/dask/typing.py @@ -20,8 +20,9 @@ class SchedulerGetCallable(Protocol): def __call__( self, - dask: Mapping, + dsk: Mapping, keys: Sequence[Hashable] | Hashable, + /, **kwargs: Any, ) -> Any: """Method called as the default scheduler for a collection. From 1de734311cf4da04743e5c6047f156021f44f4be Mon Sep 17 00:00:00 2001 From: Ian Rose Date: Thu, 12 May 2022 17:19:37 -0700 Subject: [PATCH 05/14] Add sys.platform check, remove redundant error handling. --- dask/array/core.py | 14 ++++++++---- dask/bag/core.py | 14 ++++++++---- dask/base.py | 47 ++++++++++++++++++++-------------------- dask/compatibility.py | 2 ++ dask/dataframe/core.py | 14 ++++++++---- dask/dataframe/io/hdf.py | 2 +- dask/delayed.py | 14 ++++++++---- 7 files changed, 67 insertions(+), 40 deletions(-) diff --git a/dask/array/core.py b/dask/array/core.py index 80352444481..50fd325249a 100644 --- a/dask/array/core.py +++ b/dask/array/core.py @@ -54,6 +54,7 @@ ) from dask.blockwise import blockwise as core_blockwise from dask.blockwise import broadcast_dimensions +from dask.compatibility import _EMSCRIPTEN from dask.context import globalmethod from dask.core import quote from dask.delayed import Delayed, delayed @@ -84,14 +85,19 @@ ) from dask.widgets import get_template -try: - from dask import threaded -except (ImportError, ModuleNotFoundError): +if _EMSCRIPTEN: from dask import local DEFAULT_GET = local.get_sync else: - DEFAULT_GET = threaded.get + try: + from dask import threaded + except ImportError: + from dask import local + + DEFAULT_GET = local.get_sync + else: + DEFAULT_GET = threaded.get config.update_defaults({"array": {"chunk-size": "128MiB", "rechunk-threshold": 4}}) diff --git a/dask/bag/core.py b/dask/bag/core.py index 3b00f7f43f9..8c3fd1d78d5 100644 --- a/dask/bag/core.py +++ b/dask/bag/core.py @@ -42,6 +42,7 @@ from dask.bag.avro import to_avro from dask.base import DaskMethodsMixin, dont_optimize, replace_name_in_key, tokenize from dask.blockwise import blockwise +from dask.compatibility import _EMSCRIPTEN from dask.context import globalmethod from dask.core import flatten, get_dependencies, istask, quote, reverse_dict from dask.delayed import Delayed, unpack_collections @@ -63,14 +64,19 @@ takes_multiple_arguments, ) -try: - from dask import multiprocessing -except (ImportError, ModuleNotFoundError): +if _EMSCRIPTEN: from dask import local DEFAULT_GET = local.get_sync else: - DEFAULT_GET = multiprocessing.get + try: + from dask import multiprocessing + except ImportError: + from dask import local + + DEFAULT_GET = local.get_sync + else: + DEFAULT_GET = multiprocessing.get no_default = "__no__default__" no_result = type( diff --git a/dask/base.py b/dask/base.py index 7d2b16138b1..24e078bcf59 100644 --- a/dask/base.py +++ b/dask/base.py @@ -23,7 +23,7 @@ from tlz.functoolz import Compose from dask import config, local -from dask.compatibility import _PY_VERSION +from dask.compatibility import _EMSCRIPTEN, _PY_VERSION from dask.context import thread_state from dask.core import flatten from dask.core import get as simple_get @@ -1291,29 +1291,30 @@ def _colorize(t): "single-threaded": local.get_sync, } -try: - from dask import threaded -except (ImportError, ModuleNotFoundError): - pass -else: - named_schedulers.update( - { - "threads": threaded.get, - "threading": threaded.get, - } - ) +if not _EMSCRIPTEN: + try: + from dask import threaded + except ImportError: + pass + else: + named_schedulers.update( + { + "threads": threaded.get, + "threading": threaded.get, + } + ) -try: - from dask import multiprocessing as dask_multiprocessing -except (ImportError, ModuleNotFoundError): - pass -else: - named_schedulers.update( - { - "processes": dask_multiprocessing.get, - "multiprocessing": dask_multiprocessing.get, - } - ) + try: + from dask import multiprocessing as dask_multiprocessing + except ImportError: + pass + else: + named_schedulers.update( + { + "processes": dask_multiprocessing.get, + "multiprocessing": dask_multiprocessing.get, + } + ) get_err_msg = """ diff --git a/dask/compatibility.py b/dask/compatibility.py index 2c8313413c2..b156f60d0f3 100644 --- a/dask/compatibility.py +++ b/dask/compatibility.py @@ -3,3 +3,5 @@ from packaging.version import parse as parse_version _PY_VERSION = parse_version(".".join(map(str, sys.version_info[:3]))) + +_EMSCRIPTEN = sys.platform == "emscripten" diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py index d96eba1b0d2..64fc7acb312 100644 --- a/dask/dataframe/core.py +++ b/dask/dataframe/core.py @@ -25,6 +25,7 @@ from dask.bag import map_partitions as map_bag_partitions from dask.base import DaskMethodsMixin, dont_optimize, is_dask_collection, tokenize from dask.blockwise import Blockwise, BlockwiseDep, BlockwiseDepDict, blockwise +from dask.compatibility import _EMSCRIPTEN from dask.context import globalmethod from dask.dataframe import methods from dask.dataframe._compat import PANDAS_GT_140, PANDAS_GT_150 @@ -79,14 +80,19 @@ ) from dask.widgets import get_template -try: - from dask import threaded -except (ImportError, ModuleNotFoundError): +if _EMSCRIPTEN: from dask import local DEFAULT_GET = local.get_sync else: - DEFAULT_GET = threaded.get + try: + from dask import threaded + except ImportError: + from dask import local + + DEFAULT_GET = local.get_sync + else: + DEFAULT_GET = threaded.get no_default = "__no_default__" diff --git a/dask/dataframe/io/hdf.py b/dask/dataframe/io/hdf.py index 7622cab758e..2669cf62cf6 100644 --- a/dask/dataframe/io/hdf.py +++ b/dask/dataframe/io/hdf.py @@ -18,7 +18,7 @@ try: from dask import multiprocessing -except (ImportError, ModuleNotFoundError): +except ImportError: mpget = object() else: mpget = multiprocessing.get diff --git a/dask/delayed.py b/dask/delayed.py index 36f3f3bd900..e0c2d5f3111 100644 --- a/dask/delayed.py +++ b/dask/delayed.py @@ -15,6 +15,7 @@ replace_name_in_key, ) from dask.base import tokenize as _tokenize +from dask.compatibility import _EMSCRIPTEN from dask.context import globalmethod from dask.core import flatten, quote from dask.highlevelgraph import HighLevelGraph @@ -23,14 +24,19 @@ __all__ = ["Delayed", "delayed"] -try: - from dask import threaded -except (ImportError, ModuleNotFoundError): +if _EMSCRIPTEN: from dask import local DEFAULT_GET = local.get_sync else: - DEFAULT_GET = threaded.get + try: + from dask import threaded + except ImportError: + from dask import local + + DEFAULT_GET = local.get_sync + else: + DEFAULT_GET = threaded.get def unzip(ls, nout): From 5fec2bc157020e80e05530514cf1cb7a3b28aeaf Mon Sep 17 00:00:00 2001 From: Ian Rose Date: Fri, 20 May 2022 07:12:46 -0700 Subject: [PATCH 06/14] Add ipycytoscape --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 1f671d146a7..0b9bd036cea 100755 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ "jinja2", ], "delayed": [], # keeping for backwards compatibility - "pyodide": ["jinja2"], + "pyodide": ["ipycytoscape", "jinja2"], } extras_require["complete"] = sorted({v for req in extras_require.values() for v in req}) # after complete is set, add in test From 923df9532a084e30d3f43a93560b698a1e4b0dc5 Mon Sep 17 00:00:00 2001 From: Ian Rose Date: Wed, 8 Jun 2022 16:38:13 -0700 Subject: [PATCH 07/14] Add test for different scheduler depending on platform. --- dask/dataframe/io/csv.py | 2 +- dask/system.py | 2 +- dask/tests/test_base.py | 47 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/dask/dataframe/io/csv.py b/dask/dataframe/io/csv.py index 794766cbad7..705e9d33172 100644 --- a/dask/dataframe/io/csv.py +++ b/dask/dataframe/io/csv.py @@ -5,7 +5,7 @@ try: import psutil -except ImportError: +except (ImportError, NotImplementedError): psutil = None import fsspec.implementations.local diff --git a/dask/system.py b/dask/system.py index 9d9a693a522..774eb19f235 100644 --- a/dask/system.py +++ b/dask/system.py @@ -4,7 +4,7 @@ try: import psutil -except ImportError: +except (ImportError, NotImplementedError): psutil = None __all__ = ("cpu_count", "CPU_COUNT") diff --git a/dask/tests/test_base.py b/dask/tests/test_base.py index 2c518cb2a98..d8f1e3ad769 100644 --- a/dask/tests/test_base.py +++ b/dask/tests/test_base.py @@ -1,5 +1,6 @@ import dataclasses import datetime +import inspect import os import subprocess import sys @@ -1539,3 +1540,49 @@ def __dask_optimize__(cls, dsk, keys, **kwargs): )[0] assert optimized da.utils.assert_eq(x, result) + + +# A function designed to be run in a subprocess with sys.platform patched. +# This allows for checking for different default schedulers depending on the +# platform, particularly emscripten +def check_default_scheduler(module, collection, expected, platform): + from unittest import mock + + with mock.patch("sys.platform", platform): + import importlib + + if expected == "sync": + from dask.local import get_sync as get + elif expected == "threads": + from dask.threaded import get + elif expected == "processes": + from dask.multiprocessing import get + + mod = importlib.import_module(module) + + assert getattr(mod, collection).__dask_scheduler__ == get + + +@pytest.mark.parametrize( + "params", + ( + "'dask.dataframe', '_Frame', 'sync', 'emscripten'", + f"'dask.dataframe', '_Frame', 'threads', '{sys.platform}'", + "'dask.array', 'Array', 'sync', 'emscripten'", + f"'dask.array', 'Array', 'threads', '{sys.platform}'", + "'dask.bag', 'Bag', 'sync', 'emscripten'", + f"'dask.bag', 'Bag', 'processes', '{sys.platform}'", + ), +) +def test_emscripten_default_scheduler(params): + proc = subprocess.run( + [ + sys.executable, + "-c", + ( + inspect.getsource(check_default_scheduler) + + f"check_default_scheduler({params})\n" + ), + ] + ) + proc.check_returncode() From 6d1a9e9cdf4aab9ca4a5d33b65580e4635cba7fc Mon Sep 17 00:00:00 2001 From: Ian Rose Date: Wed, 8 Jun 2022 16:47:46 -0700 Subject: [PATCH 08/14] Simplify default scheduler logic --- dask/array/core.py | 9 ++------- dask/bag/core.py | 9 ++------- dask/base.py | 38 ++++++++++++++++---------------------- dask/dataframe/core.py | 9 ++------- dask/delayed.py | 9 ++------- 5 files changed, 24 insertions(+), 50 deletions(-) diff --git a/dask/array/core.py b/dask/array/core.py index 50fd325249a..0db2936b7d5 100644 --- a/dask/array/core.py +++ b/dask/array/core.py @@ -90,14 +90,9 @@ DEFAULT_GET = local.get_sync else: - try: - from dask import threaded - except ImportError: - from dask import local + from dask import threaded - DEFAULT_GET = local.get_sync - else: - DEFAULT_GET = threaded.get + DEFAULT_GET = threaded.get config.update_defaults({"array": {"chunk-size": "128MiB", "rechunk-threshold": 4}}) diff --git a/dask/bag/core.py b/dask/bag/core.py index 8c3fd1d78d5..3e7be196d99 100644 --- a/dask/bag/core.py +++ b/dask/bag/core.py @@ -69,14 +69,9 @@ DEFAULT_GET = local.get_sync else: - try: - from dask import multiprocessing - except ImportError: - from dask import local + from dask import multiprocessing - DEFAULT_GET = local.get_sync - else: - DEFAULT_GET = multiprocessing.get + DEFAULT_GET = multiprocessing.get no_default = "__no__default__" no_result = type( diff --git a/dask/base.py b/dask/base.py index 24e078bcf59..2e3e88aea12 100644 --- a/dask/base.py +++ b/dask/base.py @@ -1292,29 +1292,23 @@ def _colorize(t): } if not _EMSCRIPTEN: - try: - from dask import threaded - except ImportError: - pass - else: - named_schedulers.update( - { - "threads": threaded.get, - "threading": threaded.get, - } - ) + from dask import threaded - try: - from dask import multiprocessing as dask_multiprocessing - except ImportError: - pass - else: - named_schedulers.update( - { - "processes": dask_multiprocessing.get, - "multiprocessing": dask_multiprocessing.get, - } - ) + named_schedulers.update( + { + "threads": threaded.get, + "threading": threaded.get, + } + ) + + from dask import multiprocessing as dask_multiprocessing + + named_schedulers.update( + { + "processes": dask_multiprocessing.get, + "multiprocessing": dask_multiprocessing.get, + } + ) get_err_msg = """ diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py index 64fc7acb312..787041c8ba5 100644 --- a/dask/dataframe/core.py +++ b/dask/dataframe/core.py @@ -85,14 +85,9 @@ DEFAULT_GET = local.get_sync else: - try: - from dask import threaded - except ImportError: - from dask import local + from dask import threaded - DEFAULT_GET = local.get_sync - else: - DEFAULT_GET = threaded.get + DEFAULT_GET = threaded.get no_default = "__no_default__" diff --git a/dask/delayed.py b/dask/delayed.py index e0c2d5f3111..26de266dfcf 100644 --- a/dask/delayed.py +++ b/dask/delayed.py @@ -29,14 +29,9 @@ DEFAULT_GET = local.get_sync else: - try: - from dask import threaded - except ImportError: - from dask import local + from dask import threaded - DEFAULT_GET = local.get_sync - else: - DEFAULT_GET = threaded.get + DEFAULT_GET = threaded.get def unzip(ls, nout): From 326e0b2c39ddaed02116c99c62cf5a494c8bb57a Mon Sep 17 00:00:00 2001 From: Ian Rose Date: Wed, 8 Jun 2022 17:10:42 -0700 Subject: [PATCH 09/14] Importorskip --- dask/tests/test_base.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dask/tests/test_base.py b/dask/tests/test_base.py index d8f1e3ad769..425f80f3d0d 100644 --- a/dask/tests/test_base.py +++ b/dask/tests/test_base.py @@ -1575,6 +1575,8 @@ def check_default_scheduler(module, collection, expected, platform): ), ) def test_emscripten_default_scheduler(params): + pytest.importorskip("dask.array") + pytest.importorskip("dask.dataframe") proc = subprocess.run( [ sys.executable, From d9c9a290d9131ed0dc2b74d46086b27d55fc24ca Mon Sep 17 00:00:00 2001 From: Ian Rose Date: Thu, 9 Jun 2022 09:04:42 -0700 Subject: [PATCH 10/14] Avoid mocking sys.platform in subprocess --- dask/dataframe/io/csv.py | 2 +- dask/system.py | 2 +- dask/tests/test_base.py | 23 ++++++++++++----------- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/dask/dataframe/io/csv.py b/dask/dataframe/io/csv.py index 705e9d33172..794766cbad7 100644 --- a/dask/dataframe/io/csv.py +++ b/dask/dataframe/io/csv.py @@ -5,7 +5,7 @@ try: import psutil -except (ImportError, NotImplementedError): +except ImportError: psutil = None import fsspec.implementations.local diff --git a/dask/system.py b/dask/system.py index 774eb19f235..9d9a693a522 100644 --- a/dask/system.py +++ b/dask/system.py @@ -4,7 +4,7 @@ try: import psutil -except (ImportError, NotImplementedError): +except ImportError: psutil = None __all__ = ("cpu_count", "CPU_COUNT") diff --git a/dask/tests/test_base.py b/dask/tests/test_base.py index 425f80f3d0d..4972b7606c1 100644 --- a/dask/tests/test_base.py +++ b/dask/tests/test_base.py @@ -1542,13 +1542,14 @@ def __dask_optimize__(cls, dsk, keys, **kwargs): da.utils.assert_eq(x, result) -# A function designed to be run in a subprocess with sys.platform patched. -# This allows for checking for different default schedulers depending on the -# platform, particularly emscripten -def check_default_scheduler(module, collection, expected, platform): +# A function designed to be run in a subprocess with dask.compatibility._EMSCRIPTEN +# patched. This allows for checking for different default schedulers depending on the +# platform. One might prefer patching `sys.platform` for a more direct test, but that +# causes problems in other libraries. +def check_default_scheduler(module, collection, expected, emscripten): from unittest import mock - with mock.patch("sys.platform", platform): + with mock.patch("dask.compatibility._EMSCRIPTEN", emscripten): import importlib if expected == "sync": @@ -1566,12 +1567,12 @@ def check_default_scheduler(module, collection, expected, platform): @pytest.mark.parametrize( "params", ( - "'dask.dataframe', '_Frame', 'sync', 'emscripten'", - f"'dask.dataframe', '_Frame', 'threads', '{sys.platform}'", - "'dask.array', 'Array', 'sync', 'emscripten'", - f"'dask.array', 'Array', 'threads', '{sys.platform}'", - "'dask.bag', 'Bag', 'sync', 'emscripten'", - f"'dask.bag', 'Bag', 'processes', '{sys.platform}'", + "'dask.dataframe', '_Frame', 'sync', True", + "'dask.dataframe', '_Frame', 'threads', False", + "'dask.array', 'Array', 'sync', True", + "'dask.array', 'Array', 'threads', False", + "'dask.bag', 'Bag', 'sync', True", + "'dask.bag', 'Bag', 'processes', False", ), ) def test_emscripten_default_scheduler(params): From 6900f209141ddf5aecd2b07ac72e4cfd5d7a6ab2 Mon Sep 17 00:00:00 2001 From: Ian Rose Date: Thu, 9 Jun 2022 11:24:45 -0700 Subject: [PATCH 11/14] Revert position only in protocol --- dask/local.py | 2 +- dask/multiprocessing.py | 1 - dask/threaded.py | 7 +++---- dask/typing.py | 1 - 4 files changed, 4 insertions(+), 7 deletions(-) diff --git a/dask/local.py b/dask/local.py index 8580b38d513..a0ac53dd1de 100644 --- a/dask/local.py +++ b/dask/local.py @@ -548,7 +548,7 @@ def submit(self, fn, *args, **kwargs): synchronous_executor = SynchronousExecutor() -def get_sync(dsk: Mapping, keys: Sequence[Hashable] | Hashable, /, **kwargs): +def get_sync(dsk: Mapping, keys: Sequence[Hashable] | Hashable, **kwargs): """A naive synchronous version of get_async Can be useful for debugging. diff --git a/dask/multiprocessing.py b/dask/multiprocessing.py index b6c9d418b99..137ae5c34a1 100644 --- a/dask/multiprocessing.py +++ b/dask/multiprocessing.py @@ -146,7 +146,6 @@ def get_context(): def get( dsk: Mapping, keys: Sequence[Hashable] | Hashable, - /, num_workers=None, func_loads=None, func_dumps=None, diff --git a/dask/threaded.py b/dask/threaded.py index c2951b66b3e..b1fadf1866d 100644 --- a/dask/threaded.py +++ b/dask/threaded.py @@ -35,8 +35,7 @@ def pack_exception(e, dumps): def get( dsk: Mapping, - result: Sequence[Hashable] | Hashable, - /, + keys: Sequence[Hashable] | Hashable, cache=None, num_workers=None, pool=None, @@ -49,7 +48,7 @@ def get( dsk: dict A dask dictionary specifying a workflow - result: key or list of keys + keys: key or list of keys Keys corresponding to desired data num_workers: integer of thread count The number of threads to use in the ThreadPool that will actually execute tasks @@ -91,7 +90,7 @@ def get( pool.submit, pool._max_workers, dsk, - result, + keys, cache=cache, get_id=_thread_get_id, pack_exception=pack_exception, diff --git a/dask/typing.py b/dask/typing.py index c29ad384085..ec2c23d07ed 100644 --- a/dask/typing.py +++ b/dask/typing.py @@ -22,7 +22,6 @@ def __call__( self, dsk: Mapping, keys: Sequence[Hashable] | Hashable, - /, **kwargs: Any, ) -> Any: """Method called as the default scheduler for a collection. From 1472d71081a6925b4d70b35e7a1c7d888a443a5d Mon Sep 17 00:00:00 2001 From: Ian Rose Date: Wed, 15 Jun 2022 08:53:39 -0700 Subject: [PATCH 12/14] Remove pyodide extras --- setup.py | 1 - 1 file changed, 1 deletion(-) diff --git a/setup.py b/setup.py index 0b9bd036cea..cbe669e8ce1 100755 --- a/setup.py +++ b/setup.py @@ -21,7 +21,6 @@ "jinja2", ], "delayed": [], # keeping for backwards compatibility - "pyodide": ["ipycytoscape", "jinja2"], } extras_require["complete"] = sorted({v for req in extras_require.values() for v in req}) # after complete is set, add in test From 78766adc45bb84d03276967188eab7b3d25f88be Mon Sep 17 00:00:00 2001 From: Ian Rose Date: Wed, 15 Jun 2022 11:32:37 -0700 Subject: [PATCH 13/14] Use named scheduler dict to get default --- dask/array/core.py | 11 ++--------- dask/bag/core.py | 18 ++++++++---------- dask/dataframe/core.py | 18 ++++++++---------- dask/delayed.py | 11 ++--------- dask/tests/test_base.py | 9 ++++++++- 5 files changed, 28 insertions(+), 39 deletions(-) diff --git a/dask/array/core.py b/dask/array/core.py index 0db2936b7d5..b24444058e9 100644 --- a/dask/array/core.py +++ b/dask/array/core.py @@ -49,12 +49,12 @@ compute_as_if_collection, dont_optimize, is_dask_collection, + named_schedulers, persist, tokenize, ) from dask.blockwise import blockwise as core_blockwise from dask.blockwise import broadcast_dimensions -from dask.compatibility import _EMSCRIPTEN from dask.context import globalmethod from dask.core import quote from dask.delayed import Delayed, delayed @@ -85,14 +85,7 @@ ) from dask.widgets import get_template -if _EMSCRIPTEN: - from dask import local - - DEFAULT_GET = local.get_sync -else: - from dask import threaded - - DEFAULT_GET = threaded.get +DEFAULT_GET = named_schedulers.get("threads", named_schedulers["sync"]) config.update_defaults({"array": {"chunk-size": "128MiB", "rechunk-threshold": 4}}) diff --git a/dask/bag/core.py b/dask/bag/core.py index 3e7be196d99..f9ecfdfce9f 100644 --- a/dask/bag/core.py +++ b/dask/bag/core.py @@ -40,9 +40,14 @@ from dask import config from dask.bag import chunk from dask.bag.avro import to_avro -from dask.base import DaskMethodsMixin, dont_optimize, replace_name_in_key, tokenize +from dask.base import ( + DaskMethodsMixin, + dont_optimize, + named_schedulers, + replace_name_in_key, + tokenize, +) from dask.blockwise import blockwise -from dask.compatibility import _EMSCRIPTEN from dask.context import globalmethod from dask.core import flatten, get_dependencies, istask, quote, reverse_dict from dask.delayed import Delayed, unpack_collections @@ -64,14 +69,7 @@ takes_multiple_arguments, ) -if _EMSCRIPTEN: - from dask import local - - DEFAULT_GET = local.get_sync -else: - from dask import multiprocessing - - DEFAULT_GET = multiprocessing.get +DEFAULT_GET = named_schedulers.get("processes", named_schedulers["sync"]) no_default = "__no__default__" no_result = type( diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py index 787041c8ba5..11dd1f3c8ad 100644 --- a/dask/dataframe/core.py +++ b/dask/dataframe/core.py @@ -23,9 +23,14 @@ from dask import core from dask.array.core import Array, normalize_arg from dask.bag import map_partitions as map_bag_partitions -from dask.base import DaskMethodsMixin, dont_optimize, is_dask_collection, tokenize +from dask.base import ( + DaskMethodsMixin, + dont_optimize, + is_dask_collection, + named_schedulers, + tokenize, +) from dask.blockwise import Blockwise, BlockwiseDep, BlockwiseDepDict, blockwise -from dask.compatibility import _EMSCRIPTEN from dask.context import globalmethod from dask.dataframe import methods from dask.dataframe._compat import PANDAS_GT_140, PANDAS_GT_150 @@ -80,14 +85,7 @@ ) from dask.widgets import get_template -if _EMSCRIPTEN: - from dask import local - - DEFAULT_GET = local.get_sync -else: - from dask import threaded - - DEFAULT_GET = threaded.get +DEFAULT_GET = named_schedulers.get("threads", named_schedulers["sync"]) no_default = "__no_default__" diff --git a/dask/delayed.py b/dask/delayed.py index 26de266dfcf..8fc99aad194 100644 --- a/dask/delayed.py +++ b/dask/delayed.py @@ -12,10 +12,10 @@ DaskMethodsMixin, dont_optimize, is_dask_collection, + named_schedulers, replace_name_in_key, ) from dask.base import tokenize as _tokenize -from dask.compatibility import _EMSCRIPTEN from dask.context import globalmethod from dask.core import flatten, quote from dask.highlevelgraph import HighLevelGraph @@ -24,14 +24,7 @@ __all__ = ["Delayed", "delayed"] -if _EMSCRIPTEN: - from dask import local - - DEFAULT_GET = local.get_sync -else: - from dask import threaded - - DEFAULT_GET = threaded.get +DEFAULT_GET = named_schedulers.get("threads", named_schedulers["sync"]) def unzip(ls, nout): diff --git a/dask/tests/test_base.py b/dask/tests/test_base.py index 4972b7606c1..f310d70db20 100644 --- a/dask/tests/test_base.py +++ b/dask/tests/test_base.py @@ -1547,9 +1547,16 @@ def __dask_optimize__(cls, dsk, keys, **kwargs): # platform. One might prefer patching `sys.platform` for a more direct test, but that # causes problems in other libraries. def check_default_scheduler(module, collection, expected, emscripten): + from contextlib import nullcontext from unittest import mock - with mock.patch("dask.compatibility._EMSCRIPTEN", emscripten): + from dask.local import get_sync + + if emscripten: + ctx = mock.patch("dask.base.named_schedulers", {"sync": get_sync}) + else: + ctx = nullcontext() + with ctx: import importlib if expected == "sync": From 84390b7f24da55d7890400d216ffec32d10d4a40 Mon Sep 17 00:00:00 2001 From: Ian Rose Date: Wed, 15 Jun 2022 13:05:00 -0700 Subject: [PATCH 14/14] one fewer importerror --- dask/dataframe/io/hdf.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/dask/dataframe/io/hdf.py b/dask/dataframe/io/hdf.py index 2669cf62cf6..d972db5b247 100644 --- a/dask/dataframe/io/hdf.py +++ b/dask/dataframe/io/hdf.py @@ -9,19 +9,19 @@ from tlz import merge from dask import config -from dask.base import compute_as_if_collection, get_scheduler, tokenize +from dask.base import ( + compute_as_if_collection, + get_scheduler, + named_schedulers, + tokenize, +) from dask.dataframe.core import DataFrame from dask.dataframe.io.io import _link, from_map from dask.dataframe.io.utils import DataFrameIOFunction from dask.delayed import Delayed, delayed from dask.utils import get_scheduler_lock -try: - from dask import multiprocessing -except ImportError: - mpget = object() -else: - mpget = multiprocessing.get +MP_GET = named_schedulers.get("processes", object()) def _pd_to_hdf(pd_to_hdf, lock, args, kwargs=None): @@ -200,7 +200,7 @@ def to_hdf( if lock is None: if not single_node: lock = True - elif not single_file and _actual_get is not mpget: + elif not single_file and _actual_get is not MP_GET: # if we're writing to multiple files with the multiprocessing # scheduler we don't need to lock lock = True