Skip to content

Commit

Permalink
Add background task to assign new workers
Browse files Browse the repository at this point in the history
  • Loading branch information
DrChainsaw committed Oct 3, 2020
1 parent 0e50a79 commit 29bd421
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions src/scheduler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,20 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
node_order = x -> -get(ord, x, 0)
state = start_state(deps, node_order)
# start off some tasks
# Note: worker_state may be different things for different contexts. Don't touch it out here!
procs_state = assign_new_procs!(ctx, state, chan, node_order)
@dbg timespan_end(ctx, :scheduler_init, 0, master)

# Check periodically for new workers in a parallel task so that we don't accidentally end up
# having to wait for 'take!(chan)' on some large task before new workers are put to work
newprocs_lock = ReentrantLock()
@async while !isempty(state.ready) || !isempty(state.running)
sleep(1)
procs_state = lock(newprocs_lock) do
assign_new_procs!(ctx, state, chan, node_order, procs_state)
end
end

# Loop while we still have thunks to execute
while !isempty(state.ready) || !isempty(state.running)
if isempty(state.running) && !isempty(state.ready)
Expand All @@ -105,8 +116,9 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
end
end

# Note: worker_state may be different things for different contexts. Don't touch it out here!
procs_state = assign_new_procs!(ctx, state, chan, node_order, procs_state)
procs_state = lock(newprocs_lock) do
assign_new_procs!(ctx, state, chan, node_order, procs_state)
end

if isempty(state.running)
# the block above fired only meta tasks
Expand Down

0 comments on commit 29bd421

Please sign in to comment.