Skip to content

Commit

Permalink
Drop oversubscription logic
Browse files Browse the repository at this point in the history
Since `concurrent.futures` already has the logic necessary to avoid
oversubscription and our logic for it doesn't really apply, go ahead and
drop the oversubscription logic from the local scheduler and rely on
`concurrent.futures`.

Note it's still possible to configure pools with specific numbers of
workers. This doesn't affect that. Instead we just submit all tasks that
are ready until we are out of ready tasks.
  • Loading branch information
jakirkham committed Mar 9, 2021
1 parent e52bbbd commit 9d0aaa2
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 10 deletions.
12 changes: 4 additions & 8 deletions dask/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,6 @@ def identity(x):

def get_async(
apply_async,
num_workers,
dsk,
result,
cache=None,
Expand All @@ -379,8 +378,6 @@ def get_async(
----------
apply_async : function
Asynchronous apply function as found on Pool or ThreadPool
num_workers : int
The number of active tasks we should have at any one time
dsk : dict
A dask dictionary specifying a workflow
result : key or list of keys
Expand Down Expand Up @@ -475,8 +472,8 @@ def fire_task():
callback=queue.put,
)

# Seed initial tasks into the thread pool
while state["ready"] and len(state["running"]) < num_workers:
# Seed initial tasks into the pool
while state["ready"]:
fire_task()

# Main loop, wait on tasks to finish, insert new ones
Expand All @@ -499,7 +496,7 @@ def fire_task():
for f in posttask_cbs:
f(key, res, dsk, state, worker_id)

while state["ready"] and len(state["running"]) < num_workers:
while state["ready"]:
fire_task()

succeeded = True
Expand Down Expand Up @@ -543,8 +540,7 @@ def get_sync(dsk, keys, **kwargs):
Can be useful for debugging.
"""
kwargs.pop("num_workers", None) # if num_workers present, remove it
return get_async(apply_sync, 1, dsk, keys, **kwargs)
return get_async(apply_sync, dsk, keys, **kwargs)


def sortkey(item):
Expand Down
1 change: 0 additions & 1 deletion dask/multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ def get(
# Run
result = get_async(
partial(multiprocessing_apply_async, pool),
pool._max_workers,
dsk3,
keys,
get_id=_process_get_id,
Expand Down
1 change: 0 additions & 1 deletion dask/threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ def get(dsk, result, cache=None, num_workers=None, pool=None, **kwargs):

results = get_async(
partial(executor_apply_async, pool),
pool._max_workers,
dsk,
result,
cache=cache,
Expand Down

0 comments on commit 9d0aaa2

Please sign in to comment.