diff --git a/docs/src/processors.md b/docs/src/processors.md index 470ac9865..a50fd4469 100644 --- a/docs/src/processors.md +++ b/docs/src/processors.md @@ -70,3 +70,48 @@ complicated detection and recovery process, including multiple master processes, a distributed and replicated database such as etcd, and checkpointing of the scheduler to ensure an efficient recovery. Such a system does not yet exist, but contributions for such a change are desired. + +## Dynamic worker pools + +Daggers default scheduler supports modifying the worker pool while the +scheduler is running. This is done by modifying the `Processor`s of the +`Context` supplied to the scheduler at initialization using +`addprocs!(ctx, ps)` and `rmprocs(ctx, ps)` where `ps` can be `Processors` or + just process ids. + +An example of when this is useful is in HPC environments where individual +jobs to start up workers are queued so that not all workers are guaranteed to +be available at the same time. + +New workers will typically be assigned new tasks as soon as the scheduler +sees them. Removed workers will finish all their assigned tasks but will not +be assigned any new tasks. Note that this makes it difficult to determine when +a worker is no longer in use by Dagger. + +Example: + +```julia +using Distributed + +ps1 = addprocs(2, exeflags="--project"); +@everywhere using Distributed, Dagger + +# Dummy task to wait for 0.5 seconds and then return the id of the worker +ts = delayed(vcat)((delayed(i -> (sleep(0.5); myid()))(i) for i in 1:20)...); + +ctx = Context(); +# Scheduler is blocking, so we need a new task to add workers while it runs +job = @async collect(ctx, ts); + +# Lets fire up some new workers +ps2 = addprocs(2, exeflags="--project"); +@everywhere using Distributed, Dagger +# New workers are not available until we do this +addprocs!(ctx, ps2) + +# Lets hope the job didn't complete before workers were added :) +fetch(job) |> unique + +# and cleanup after ourselves... +workers() |> rmprocs +``` diff --git a/src/fault-handler.jl b/src/fault-handler.jl index f0996111d..eb688dbf7 100644 --- a/src/fault-handler.jl +++ b/src/fault-handler.jl @@ -124,6 +124,9 @@ function handle_fault(ctx, state, thunk, oldproc, chan, node_order) # Reschedule inputs from deadlist newproc = OSProc(rand(workers())) + if newproc ∉ procs(ctx) + addprocs!(ctx, [newproc]) + end while length(deadlist) > 0 dt = popfirst!(deadlist) if any((input in deadlist) for input in dt.inputs) diff --git a/src/processor.jl b/src/processor.jl index 77bab713a..bad64ad69 100644 --- a/src/processor.jl +++ b/src/processor.jl @@ -1,4 +1,4 @@ -export OSProc, Context +export OSProc, Context, addprocs!, rmprocs! """ Processor @@ -238,6 +238,7 @@ mutable struct Context log_sink::Any profile::Bool options + proc_lock::ReentrantLock end """ @@ -256,12 +257,15 @@ as a `Vector{Int}`. function Context(xs) Context(xs, NoOpLog(), false, nothing) # By default don't log events end +Context(xs, log_sink, profile, options) = Context(xs, log_sink, profile, options, ReentrantLock()) Context(xs::Vector{Int}) = Context(map(OSProc, xs)) function Context() procs = [OSProc(w) for w in workers()] Context(procs) end -procs(ctx::Context) = ctx.procs +procs(ctx::Context) = lock(ctx) do + copy(ctx.procs) +end """ write_event(ctx::Context, event::Event) @@ -271,3 +275,38 @@ Write a log event function write_event(ctx::Context, event::Event) write_event(ctx.log_sink, event) end + +""" + lock(f, ctx::Context) + +Acquire `ctx.proc_lock`, execute `f` with the lock held, and release the lock when `f` returns. +""" +Base.lock(f, ctx::Context) = lock(f, ctx.proc_lock) + +""" + addprocs!(ctx::Context, xs) + +Add new workers `xs` to `ctx`. + +Workers will typically be assigned new tasks in the next scheduling iteration if scheduling is ongoing. + +Workers can be either `Processor`s or the underlying process ids as `Integer`s. +""" +addprocs!(ctx::Context, xs::AbstractVector{<:Integer}) = addprocs!(ctx, map(OSProc, xs)) +addprocs!(ctx::Context, xs::AbstractVector{<:Processor}) = lock(ctx) do + append!(ctx.procs, xs) +end +""" + rmprocs!(ctx::Context, xs) + +Remove the specified workers `xs` from `ctx`. + +Workers will typically finish all their assigned tasks if scheduling is ongoing but will not be assigned new tasks after removal. + +Workers can be either `Processors` or the underlying process ids as `Integer`s. +""" +rmprocs!(ctx::Context, xs::AbstractVector{<:Integer}) = rmprocs!(ctx, map(OSProc, xs)) +rmprocs!(ctx::Context, xs::AbstractVector{<:Processor}) = lock(ctx) do + filter!(p -> p ∉ xs, ctx.procs) +end + diff --git a/src/scheduler.jl b/src/scheduler.jl index 9d1370386..363cd38e8 100644 --- a/src/scheduler.jl +++ b/src/scheduler.jl @@ -3,7 +3,7 @@ module Sch using Distributed import MemPool: DRef -import ..Dagger: Context, Processor, Thunk, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, choose_processor, execute! +import ..Dagger: Context, Processor, Thunk, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, choose_processor, execute!, rmprocs!, addprocs! include("fault-handler.jl") @@ -82,12 +82,6 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) master = OSProc(myid()) @dbg timespan_start(ctx, :scheduler_init, 0, master) - if options.single !== 0 - @assert options.single in vcat(1, workers()) "Sch option 'single' must specify an active worker id" - ps = OSProc[OSProc(options.single)] - else - ps = procs(ctx) - end chan = Channel{Any}(32) deps = dependents(d) ord = order(d, noffspring(deps)) @@ -95,20 +89,25 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) node_order = x -> -get(ord, x, 0) state = start_state(deps, node_order) # start off some tasks - for p in ps - isempty(state.ready) && break - task = pop_with_affinity!(ctx, state.ready, p, false) - if task !== nothing - fire_task!(ctx, task, p, state, chan, node_order) + # Note: worker_state may be different things for different contexts. Don't touch it out here! + procs_state = assign_new_procs!(ctx, state, chan, node_order) + @dbg timespan_end(ctx, :scheduler_init, 0, master) + + # Check periodically for new workers in a parallel task so that we don't accidentally end up + # having to wait for 'take!(chan)' on some large task before new workers are put to work + newprocs_lock = ReentrantLock() + @async while !isempty(state.ready) || !isempty(state.running) + sleep(1) + procs_state = lock(newprocs_lock) do + assign_new_procs!(ctx, state, chan, node_order, procs_state) end end - @dbg timespan_end(ctx, :scheduler_init, 0, master) # Loop while we still have thunks to execute while !isempty(state.ready) || !isempty(state.running) if isempty(state.running) && !isempty(state.ready) # Nothing running, so schedule up to N thunks, 1 per N workers - for p in ps + for p in procs_to_use(ctx) isempty(state.ready) && break task = pop_with_affinity!(ctx, state.ready, p, false) if task !== nothing @@ -117,6 +116,10 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) end end + procs_state = lock(newprocs_lock) do + assign_new_procs!(ctx, state, chan, node_order, procs_state) + end + if isempty(state.running) # the block above fired only meta tasks continue @@ -128,8 +131,8 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) @warn "Worker $(proc.pid) died on thunk $thunk_id, rescheduling work" # Remove dead worker from procs list - filter!(p->p.pid!=proc.pid, ctx.procs) - ps = procs(ctx) + # Not sure what is desired behaviour if option.singleworker is set... + rmprocs!(ctx, [proc]) handle_fault(ctx, state, state.thunk_dict[thunk_id], proc, chan, node_order) continue @@ -143,8 +146,8 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) @dbg timespan_start(ctx, :scheduler, thunk_id, master) immediate_next = finish_task!(state, node, node_order) - if !isempty(state.ready) - thunk = pop_with_affinity!(Context(ps), state.ready, proc, immediate_next) + if !isempty(state.ready) && !shall_remove_proc(ctx, proc, immediate_next) + thunk = pop_with_affinity!(Context(procs_to_use(ctx)), state.ready, proc, immediate_next) if thunk !== nothing fire_task!(ctx, thunk, proc, state, chan, node_order) end @@ -154,6 +157,33 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) state.cache[d] end +procs_to_use(ctx) = procs_to_use(ctx, ctx.options) +function procs_to_use(ctx, options) + return if options.single !== 0 + @assert options.single in vcat(1, workers()) "Sch option 'single' must specify an active worker id" + OSProc[OSProc(options.single)] + else + procs(ctx) + end +end + +# Main responsibility of this function is to check if new procs have been pushed to the context +function assign_new_procs!(ctx, state, chan, node_order, assignedprocs=[]) + ps = procs_to_use(ctx) + # Must track individual procs to handle the case when procs are removed + for p in setdiff(ps, assignedprocs) + isempty(state.ready) && break + task = pop_with_affinity!(ctx, state.ready, p, false) + if task !== nothing + fire_task!(ctx, task, p, state, chan, node_order) + end + end + return ps +end + +# Might be a good policy to not remove the proc if immediate_next +shall_remove_proc(ctx, proc, immediate_next) = proc ∉ procs_to_use(ctx) + function pop_with_affinity!(ctx, tasks, proc, immediate_next) # allow JIT specialization on Pairs mapfirst(c) = first.(c) diff --git a/test/processors.jl b/test/processors.jl index 93ba80548..a3b7d2dab 100644 --- a/test/processors.jl +++ b/test/processors.jl @@ -76,4 +76,19 @@ end @everywhere pop!(Dagger.PROCESSOR_CALLBACKS) end end + + @testset "Modify workers in Context" begin + ps = addprocs(4, exeflags="--project") + @everywhere ps using Dagger + + ctx = Context(ps[1:2]) + + Dagger.addprocs!(ctx, ps[3:end]) + @test map(p -> p.pid, procs(ctx)) == ps + + Dagger.rmprocs!(ctx, ps[3:end]) + @test map(p -> p.pid, procs(ctx)) == ps[1:2] + + wait(rmprocs(ps)) + end end diff --git a/test/scheduler.jl b/test/scheduler.jl index 2641a40b6..134f43781 100644 --- a/test/scheduler.jl +++ b/test/scheduler.jl @@ -66,31 +66,97 @@ end end @everywhere (pop!(Dagger.PROCESSOR_CALLBACKS); empty!(Dagger.OSPROC_CACHE)) - @testset "Add new workers" begin - using Distributed - ps1 = addprocs(2, exeflags="--project"); - - @everywhere begin + @testset "Modify workers in running job" begin + # Test that we can add/remove workers while scheduler is running. + # As this requires asynchronity a flag is used to stall the tasks to + # ensure workers are actually modified while the scheduler is working + + setup = quote using Dagger, Distributed - # Condition to guarantee that processing is not completed before we add new workers - c = Condition() + # blocked is to guarantee that processing is not completed before we add new workers + # Note: blocked is used in expressions below + blocked = true function testfun(i) - i < 2 && return myid() - wait(c) - return myid() + i < 4 && return myid() + # Wait for test to do its thing before we proceed + while blocked + sleep(0.001) + end + return myid() end end - - ts = delayed(vcat)((delayed(testfun)(i) for i in 1:4)...); - job = @async collect(Context(ps1), ts); - - ps2 = addprocs(2, exeflags="--project"); - - while !istaskdone(job) - @everywhere ps1 notify(c) + + @testset "Add new workers" begin + ps = [] + try + ps1 = addprocs(2, exeflags="--project") + append!(ps, ps1) + + @everywhere vcat(ps1, myid()) $setup + + ts = delayed(vcat)((delayed(testfun)(i) for i in 1:10)...) + + ctx = Context(ps1) + job = @async collect(ctx, ts) + + while !istaskstarted(job) + sleep(0.001) + end + + # Will not be added, so they should never appear in output + ps2 = addprocs(2, exeflags="--project") + append!(ps, ps2) + + ps3 = addprocs(2, exeflags="--project") + append!(ps, ps3) + @everywhere ps3 $setup + addprocs!(ctx, ps3) + @test length(procs(ctx)) == 4 + + @everywhere vcat(ps1, ps3) blocked=false + + @test fetch(job) isa Vector + @test fetch(job) |> unique |> sort == vcat(ps1, ps3) + + finally + wait(rmprocs(ps)) + end end - @test fetch(job) |> unique |> sort == ps1 - wait(rmprocs(vcat(ps1,ps2))) + @testset "Remove workers" begin + ps = [] + try + ps1 = addprocs(4, exeflags="--project") + append!(ps, ps1) + + @everywhere vcat(ps1, myid()) $setup + + ts = delayed(vcat)((delayed(testfun)(i) for i in 1:16)...) + + ctx = Context(ps1) + job = @async collect(ctx, ts) + + while !istaskstarted(job) + sleep(0.001) + end + + rmprocs!(ctx, ps1[3:end]) + @test length(procs(ctx)) == 2 + + @everywhere ps1 blocked=false + + res = fetch(job) + @test res isa Vector + # First all four workers will report their IDs without hassle + # Then all four will be waiting for the Condition + # While they are waiting ps1[3:end] are removed, but when the Condition is notified they will finish their tasks before being removed + # Will probably break if workers are assigned more than one Thunk + @test res[1:8] |> unique |> sort == ps1 + @test all(pid -> pid in ps1[1:2], res[9:end]) + + finally + wait(rmprocs(ps)) + end + end end end