Skip to content

Commit

Permalink
Lock all launching of new tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
DrChainsaw committed Oct 4, 2020
1 parent 681795a commit 562a953
Showing 1 changed file with 15 additions and 11 deletions.
26 changes: 15 additions & 11 deletions src/scheduler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())

# Check periodically for new workers in a parallel task so that we don't accidentally end up
# having to wait for 'take!(chan)' on some large task before new workers are put to work
newprocs_lock = ReentrantLock()
newtasks_lock = ReentrantLock()
@async while !isempty(state.ready) || !isempty(state.running)
sleep(1)
procs_state = lock(newprocs_lock) do
procs_state = lock(newtasks_lock) do
assign_new_procs!(ctx, state, chan, node_order, procs_state)
end
end
Expand All @@ -107,16 +107,18 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
while !isempty(state.ready) || !isempty(state.running)
if isempty(state.running) && !isempty(state.ready)
# Nothing running, so schedule up to N thunks, 1 per N workers
for p in procs_to_use(ctx)
isempty(state.ready) && break
task = pop_with_affinity!(ctx, state.ready, p, false)
if task !== nothing
fire_task!(ctx, task, p, state, chan, node_order)
lock(newtasks_lock) do
for p in procs_to_use(ctx)
isempty(state.ready) && break
task = pop_with_affinity!(ctx, state.ready, p, false)
if task !== nothing
fire_task!(ctx, task, p, state, chan, node_order)
end
end
end
end

procs_state = lock(newprocs_lock) do
procs_state = lock(newtasks_lock) do
assign_new_procs!(ctx, state, chan, node_order, procs_state)
end

Expand Down Expand Up @@ -147,9 +149,11 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
@dbg timespan_start(ctx, :scheduler, thunk_id, master)
immediate_next = finish_task!(state, node, node_order)
if !isempty(state.ready) && !shall_remove_proc(ctx, proc, immediate_next)
thunk = pop_with_affinity!(Context(procs_to_use(ctx)), state.ready, proc, immediate_next)
if thunk !== nothing
fire_task!(ctx, thunk, proc, state, chan, node_order)
lock(newtasks_lock) do
thunk = pop_with_affinity!(Context(procs_to_use(ctx)), state.ready, proc, immediate_next)
if thunk !== nothing
fire_task!(ctx, thunk, proc, state, chan, node_order)
end
end
end
@dbg timespan_end(ctx, :scheduler, thunk_id, master)
Expand Down

0 comments on commit 562a953

Please sign in to comment.