Skip to content

Commit

Permalink
Simplify default scheduler logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian Rose committed Jun 8, 2022
1 parent 923df95 commit 6d1a9e9
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 50 deletions.
9 changes: 2 additions & 7 deletions dask/array/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,9 @@

DEFAULT_GET = local.get_sync
else:
try:
from dask import threaded
except ImportError:
from dask import local
from dask import threaded

DEFAULT_GET = local.get_sync
else:
DEFAULT_GET = threaded.get
DEFAULT_GET = threaded.get

config.update_defaults({"array": {"chunk-size": "128MiB", "rechunk-threshold": 4}})

Expand Down
9 changes: 2 additions & 7 deletions dask/bag/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,9 @@

DEFAULT_GET = local.get_sync
else:
try:
from dask import multiprocessing
except ImportError:
from dask import local
from dask import multiprocessing

DEFAULT_GET = local.get_sync
else:
DEFAULT_GET = multiprocessing.get
DEFAULT_GET = multiprocessing.get

no_default = "__no__default__"
no_result = type(
Expand Down
38 changes: 16 additions & 22 deletions dask/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1292,29 +1292,23 @@ def _colorize(t):
}

if not _EMSCRIPTEN:
try:
from dask import threaded
except ImportError:
pass
else:
named_schedulers.update(
{
"threads": threaded.get,
"threading": threaded.get,
}
)
from dask import threaded

try:
from dask import multiprocessing as dask_multiprocessing
except ImportError:
pass
else:
named_schedulers.update(
{
"processes": dask_multiprocessing.get,
"multiprocessing": dask_multiprocessing.get,
}
)
named_schedulers.update(
{
"threads": threaded.get,
"threading": threaded.get,
}
)

from dask import multiprocessing as dask_multiprocessing

named_schedulers.update(
{
"processes": dask_multiprocessing.get,
"multiprocessing": dask_multiprocessing.get,
}
)


get_err_msg = """
Expand Down
9 changes: 2 additions & 7 deletions dask/dataframe/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,9 @@

DEFAULT_GET = local.get_sync
else:
try:
from dask import threaded
except ImportError:
from dask import local
from dask import threaded

DEFAULT_GET = local.get_sync
else:
DEFAULT_GET = threaded.get
DEFAULT_GET = threaded.get

no_default = "__no_default__"

Expand Down
9 changes: 2 additions & 7 deletions dask/delayed.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,9 @@

DEFAULT_GET = local.get_sync
else:
try:
from dask import threaded
except ImportError:
from dask import local
from dask import threaded

DEFAULT_GET = local.get_sync
else:
DEFAULT_GET = threaded.get
DEFAULT_GET = threaded.get


def unzip(ls, nout):
Expand Down

0 comments on commit 6d1a9e9

Please sign in to comment.