Skip to content

Commit

Permalink
Merge branch 'main' of github.com:dask/distributed into event-loop-da…
Browse files Browse the repository at this point in the history
…shboard
  • Loading branch information
mrocklin committed Mar 24, 2022
2 parents b780de3 + 5c7d555 commit 55e8090
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 45 deletions.
37 changes: 19 additions & 18 deletions distributed/stealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,10 +415,7 @@ def maybe_move_task(level, ts, sat, idl, duration, cost_multiplier):
if not idle:
break

if _has_restrictions(ts):
thieves = [ws for ws in idle if _can_steal(ws, ts, sat)]
else:
thieves = idle
thieves = _potential_thieves_for(ts, idle)
if not thieves:
break
thief = thieves[i % len(thieves)]
Expand Down Expand Up @@ -451,10 +448,7 @@ def maybe_move_task(level, ts, sat, idl, duration, cost_multiplier):
continue

i += 1
if _has_restrictions(ts):
thieves = [ws for ws in idle if _can_steal(ws, ts, sat)]
else:
thieves = idle
thieves = _potential_thieves_for(ts, idle)
if not thieves:
continue
thief = thieves[i % len(thieves)]
Expand Down Expand Up @@ -492,18 +486,16 @@ def story(self, *keys):
return out


def _has_restrictions(ts):
"""Determine whether the given task has restrictions and whether these
restrictions are strict.
"""
return not ts.loose_restrictions and (
ts.host_restrictions or ts.worker_restrictions or ts.resource_restrictions
)
def _potential_thieves_for(ts, idle):
"""Return the list of workers from ``idle`` that could steal ``ts``."""
if _has_restrictions(ts):
return [ws for ws in idle if _can_steal(ws, ts)]
else:
return idle


def _can_steal(thief, ts, victim):
"""Determine whether worker ``thief`` can steal task ``ts`` from worker
``victim``.
def _can_steal(thief, ts):
"""Determine whether worker ``thief`` can steal task ``ts``.
Assumes that `ts` has some restrictions.
"""
Expand All @@ -529,4 +521,13 @@ def _can_steal(thief, ts, victim):
return True


def _has_restrictions(ts):
"""Determine whether the given task has restrictions and whether these
restrictions are strict.
"""
return not ts.loose_restrictions and (
ts.host_restrictions or ts.worker_restrictions or ts.resource_restrictions
)


fast_tasks = {"split-shuffle"}
4 changes: 2 additions & 2 deletions distributed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ async def test_counters():
assert c["op"].components[0] == {"identity": 2, "div": 1}


@gen_cluster()
@gen_cluster(config={"distributed.admin.tick.interval": "20 ms"})
async def test_ticks(s, a, b):
pytest.importorskip("crick")
await asyncio.sleep(0.1)
Expand All @@ -770,7 +770,7 @@ async def test_ticks(s, a, b):
assert 0.01 < c.components[0].quantile(0.5) < 0.5


@gen_cluster()
@gen_cluster(config={"distributed.admin.tick.interval": "20 ms"})
async def test_tick_logging(s, a, b):
pytest.importorskip("crick")
from distributed import core
Expand Down
15 changes: 1 addition & 14 deletions distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1124,21 +1124,8 @@ def color_of(x, palette=palette):
return palette[n % len(palette)]


def _iscoroutinefunction(f):
return inspect.iscoroutinefunction(f) or gen.is_coroutine_function(f)


@functools.lru_cache(None)
def _iscoroutinefunction_cached(f):
return _iscoroutinefunction(f)


def iscoroutinefunction(f):
# Attempt to use lru_cache version and fall back to non-cached version if needed
try:
return _iscoroutinefunction_cached(f)
except TypeError: # unhashable type
return _iscoroutinefunction(f)
return inspect.iscoroutinefunction(f) or gen.is_coroutine_function(f)


@contextmanager
Expand Down
19 changes: 12 additions & 7 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1770,13 +1770,18 @@ def clean(threads=not WINDOWS, instances=True, timeout=1, processes=True):
with check_active_rpc(loop, timeout):
reset_config()

dask.config.set({"distributed.comm.timeouts.connect": "5s"})
# Restore default logging levels
# XXX use pytest hooks/fixtures instead?
for name, level in logging_levels.items():
logging.getLogger(name).setLevel(level)

yield loop
with dask.config.set(
{
"distributed.comm.timeouts.connect": "5s",
"distributed.admin.tick.interval": "500 ms",
}
):
# Restore default logging levels
# XXX use pytest hooks/fixtures instead?
for name, level in logging_levels.items():
logging.getLogger(name).setLevel(level)

yield loop


@pytest.fixture
Expand Down
8 changes: 4 additions & 4 deletions docs/source/related-work.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ There are a couple of older projects that often get mentioned
* Pyro_: Remote objects / RPC

.. _Luigi: https://luigi.readthedocs.io/en/latest/
.. _MPI4Py: http://mpi4py.readthedocs.io/en/stable/
.. _MPI4Py: https://mpi4py.readthedocs.io/en/stable/
.. _PyZMQ: https://github.com/zeromq/pyzmq
.. _Celery: http://www.celeryproject.org/
.. _Celery: https://docs.celeryq.dev/
.. _`IPython Parallel`: https://ipyparallel.readthedocs.io/en/latest/
.. _Scoop: https://github.com/soravux/scoop/
.. _`concurrent.futures`: https://docs.python.org/3/library/concurrent.futures.html
.. _Dispy: http://dispy.sourceforge.net/
.. _Pyro: https://pythonhosted.org/Pyro4/
.. _Dispy: https://dispy.org/
.. _Pyro: https://pyro4.readthedocs.io/

Relationship
------------
Expand Down

0 comments on commit 55e8090

Please sign in to comment.