From 65d8073d9bbd617ed4d83173e5b6fd1e507f3285 Mon Sep 17 00:00:00 2001 From: DrChainsaw Date: Thu, 1 Oct 2020 21:10:44 +0200 Subject: [PATCH 01/10] Add possibiliy to add new workers to context --- src/processor.jl | 23 ++++++++++++++++++++++ src/scheduler.jl | 25 +++++++++++++++++------- test/scheduler.jl | 49 +++++++++++++++++++++++++++++++++++------------ 3 files changed, 78 insertions(+), 19 deletions(-) diff --git a/src/processor.jl b/src/processor.jl index 77bab713a..d9ce9d5b6 100644 --- a/src/processor.jl +++ b/src/processor.jl @@ -238,6 +238,7 @@ mutable struct Context log_sink::Any profile::Bool options + proc_lock::ReentrantLock end """ @@ -256,6 +257,7 @@ as a `Vector{Int}`. function Context(xs) Context(xs, NoOpLog(), false, nothing) # By default don't log events end +Context(xs, log_sink, profile, options) = Context(xs, log_sink, profile, options, ReentrantLock()) Context(xs::Vector{Int}) = Context(map(OSProc, xs)) function Context() procs = [OSProc(w) for w in workers()] @@ -271,3 +273,24 @@ Write a log event function write_event(ctx::Context, event::Event) write_event(ctx.log_sink, event) end + +""" + lock(f, ctx::Context) + +Acquire `ctx.proc_lock`, execute `f` with the lock held, and release the lock when `f` returns. +""" +Base.lock(f, ctx::Context) = lock(f, ctx.proc_lock) + +""" + addprocs!(ctx::Context, xs) + +Add new workers `xs` to an existing `Context ctx`. + +Workers will typically be assigned new tasks in the next scheduling iteration. + +Workers can be either `Processors` or the underlying process ids as `Integer`s. +""" +addprocs!(ctx::Context, xs::AbstractVector{<:Integer}) = addprocs!(ctx, map(OSProc, xs)) +addprocs!(ctx::Context, xs::AbstractVector{<:Processor}) = lock(ctx) do + append!(ctx.procs, xs) +end diff --git a/src/scheduler.jl b/src/scheduler.jl index b6c17c65f..7183ec701 100644 --- a/src/scheduler.jl +++ b/src/scheduler.jl @@ -95,13 +95,7 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) node_order = x -> -get(ord, x, 0) state = start_state(deps, node_order) # start off some tasks - for p in ps - isempty(state.ready) && break - task = pop_with_affinity!(ctx, state.ready, p, false) - if task !== nothing - fire_task!(ctx, task, p, state, chan, node_order) - end - end + worker_state = assign_new_workers!(ctx, procs, state, chan, node_order) @dbg timespan_end(ctx, :scheduler_init, 0, master) # Loop while we still have thunks to execute @@ -117,6 +111,9 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) end end + # Note: worker_state may be different things for different contexts. Don't touch it out here! + worker_state = assign_new_workers!(ctx ,ps, state, chan, node_order, worker_state) + if isempty(state.running) # the block above fired only meta tasks continue @@ -154,6 +151,20 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) state.cache[d] end +function assign_new_workers!(ctx, ps, state, chan, node_order, nadded=0) + ps != procs(ctx) && return nadded + lock(ctx) do + for p in ps[1+nadded:end] + isempty(state.ready) && break + task = pop_with_affinity!(ctx, state.ready, p, false) + if task !== nothing + fire_task!(ctx, task, p, state, chan, node_order) + end + end + return length(ps) + end +end + function pop_with_affinity!(ctx, tasks, proc, immediate_next) # allow JIT specialization on Pairs mapfirst(c) = first.(c) diff --git a/test/scheduler.jl b/test/scheduler.jl index 2641a40b6..7af0ef782 100644 --- a/test/scheduler.jl +++ b/test/scheduler.jl @@ -67,30 +67,55 @@ end @everywhere (pop!(Dagger.PROCESSOR_CALLBACKS); empty!(Dagger.OSPROC_CACHE)) @testset "Add new workers" begin + # Test that we can add new workers to an ongoing task. + # As this requires asynchronity a Condition is used to stall the tasks to + # ensure workers are actually added while the scheduler is working using Distributed - ps1 = addprocs(2, exeflags="--project"); - - @everywhere begin + + setup = quote using Dagger, Distributed # Condition to guarantee that processing is not completed before we add new workers c = Condition() function testfun(i) - i < 2 && return myid() + i < 4 && return myid() wait(c) return myid() end end + + ps = [] + try + ps1 = addprocs(2, exeflags="--project"); + push!(ps, ps1) + + @everywhere $setup - ts = delayed(vcat)((delayed(testfun)(i) for i in 1:4)...); - job = @async collect(Context(ps1), ts); + ts = delayed(vcat)((delayed(testfun)(i) for i in 1:10)...); + + ctx = Context(ps1) + job = @async collect(ctx, ts); - ps2 = addprocs(2, exeflags="--project"); + # Will not be added, so they should never appear in output + ps2 = addprocs(2, exeflags="--project"); + push!(ps, ps2) + + ps3 = addprocs(2, exeflags="--project") + push!(ps, ps3) + @everywhere ps3 $setup + Dagger.addprocs!(ctx, ps3) - while !istaskdone(job) - @everywhere ps1 notify(c) - end - @test fetch(job) |> unique |> sort == ps1 + while !istaskdone(job) + sleep(0.01) + if istaskstarted(job) + @everywhere ps1 notify(c) + @everywhere ps3 notify(c) + end + end + @test fetch(job) isa Vector + @test fetch(job) |> unique |> sort == vcat(ps1, ps3) - wait(rmprocs(vcat(ps1,ps2))) + finally + wait(rmprocs(ps)) + end end end From 5e67f087ede6f3b471758d73610611e4d5ba4b3f Mon Sep 17 00:00:00 2001 From: DrChainsaw Date: Thu, 1 Oct 2020 23:19:19 +0200 Subject: [PATCH 02/10] Add option to remove workers from context --- src/processor.jl | 20 +++++++-- src/scheduler.jl | 22 ++++++---- test/processors.jl | 15 +++++++ test/scheduler.jl | 102 +++++++++++++++++++++++++++++++-------------- 4 files changed, 118 insertions(+), 41 deletions(-) diff --git a/src/processor.jl b/src/processor.jl index d9ce9d5b6..3f1344714 100644 --- a/src/processor.jl +++ b/src/processor.jl @@ -284,13 +284,27 @@ Base.lock(f, ctx::Context) = lock(f, ctx.proc_lock) """ addprocs!(ctx::Context, xs) -Add new workers `xs` to an existing `Context ctx`. +Add new workers `xs` to `ctx`. -Workers will typically be assigned new tasks in the next scheduling iteration. +Workers will typically be assigned new tasks in the next scheduling iteration if scheduling is ongoing. -Workers can be either `Processors` or the underlying process ids as `Integer`s. +Workers can be either `Processor`s or the underlying process ids as `Integer`s. """ addprocs!(ctx::Context, xs::AbstractVector{<:Integer}) = addprocs!(ctx, map(OSProc, xs)) addprocs!(ctx::Context, xs::AbstractVector{<:Processor}) = lock(ctx) do append!(ctx.procs, xs) end +""" + rmprocs!(ctx::Context, xs) + +Remove the specified workers `xs` from `ctx`. + +Workers will typically finish all their assigned tasks if scheduling is ongoing but will not be assigned new tasks after removal. + +Workers can be either `Processors` or the underlying process ids as `Integer`s. +""" +rmprocs!(ctx::Context, xs::AbstractVector{<:Integer}) = rmprocs!(ctx, map(OSProc, xs)) +rmprocs!(ctx::Context, xs::AbstractVector{<:Processor}) = lock(ctx) do + filter!(p -> p ∉ xs, ctx.procs) +end + diff --git a/src/scheduler.jl b/src/scheduler.jl index 7183ec701..f28f70391 100644 --- a/src/scheduler.jl +++ b/src/scheduler.jl @@ -3,7 +3,7 @@ module Sch using Distributed import MemPool: DRef -import ..Dagger: Context, Processor, Thunk, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, choose_processor, execute! +import ..Dagger: Context, Processor, Thunk, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, choose_processor, execute!, rmprocs! include("fault-handler.jl") @@ -125,7 +125,7 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) @warn "Worker $(proc.pid) died on thunk $thunk_id, rescheduling work" # Remove dead worker from procs list - filter!(p->p.pid!=proc.pid, ctx.procs) + rmprocs!(ctx, [proc]) ps = procs(ctx) handle_fault(ctx, state, state.thunk_dict[thunk_id], proc, chan, node_order) @@ -140,7 +140,7 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) @dbg timespan_start(ctx, :scheduler, thunk_id, master) immediate_next = finish_task!(state, node, node_order) - if !isempty(state.ready) + if !isempty(state.ready) && !shall_remove_worker(ctx, proc, ps, immediate_next) thunk = pop_with_affinity!(Context(ps), state.ready, proc, immediate_next) if thunk !== nothing fire_task!(ctx, thunk, proc, state, chan, node_order) @@ -151,17 +151,25 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) state.cache[d] end -function assign_new_workers!(ctx, ps, state, chan, node_order, nadded=0) - ps != procs(ctx) && return nadded +function assign_new_workers!(ctx, ps, state, chan, node_order, assignedprocs=[]) + ps !== procs(ctx) && return assignedprocs lock(ctx) do - for p in ps[1+nadded:end] + # Must track individual procs to handle the case when procs are removed + for p in setdiff(ps, assignedprocs) isempty(state.ready) && break task = pop_with_affinity!(ctx, state.ready, p, false) if task !== nothing fire_task!(ctx, task, p, state, chan, node_order) end end - return length(ps) + return copy(ps) + end +end + +function shall_remove_worker(ctx, proc, ps, immediate_next) + ps !== procs(ctx) && return false + return lock(ctx) do + proc ∉ procs(ctx) end end diff --git a/test/processors.jl b/test/processors.jl index 93ba80548..a3b7d2dab 100644 --- a/test/processors.jl +++ b/test/processors.jl @@ -76,4 +76,19 @@ end @everywhere pop!(Dagger.PROCESSOR_CALLBACKS) end end + + @testset "Modify workers in Context" begin + ps = addprocs(4, exeflags="--project") + @everywhere ps using Dagger + + ctx = Context(ps[1:2]) + + Dagger.addprocs!(ctx, ps[3:end]) + @test map(p -> p.pid, procs(ctx)) == ps + + Dagger.rmprocs!(ctx, ps[3:end]) + @test map(p -> p.pid, procs(ctx)) == ps[1:2] + + wait(rmprocs(ps)) + end end diff --git a/test/scheduler.jl b/test/scheduler.jl index 7af0ef782..2eb521d06 100644 --- a/test/scheduler.jl +++ b/test/scheduler.jl @@ -66,15 +66,15 @@ end end @everywhere (pop!(Dagger.PROCESSOR_CALLBACKS); empty!(Dagger.OSPROC_CACHE)) - @testset "Add new workers" begin - # Test that we can add new workers to an ongoing task. + @testset "Modify workers in running job" begin + # Test that we can add/remove workers while scheduler is running. # As this requires asynchronity a Condition is used to stall the tasks to - # ensure workers are actually added while the scheduler is working - using Distributed - + # ensure workers are actually modified while the scheduler is working + setup = quote using Dagger, Distributed # Condition to guarantee that processing is not completed before we add new workers + # Note: c is used in expressions below c = Condition() function testfun(i) i < 4 && return myid() @@ -83,39 +83,79 @@ end end end + @testset "Add new workers" begin ps = [] try - ps1 = addprocs(2, exeflags="--project"); - push!(ps, ps1) - - @everywhere $setup - - ts = delayed(vcat)((delayed(testfun)(i) for i in 1:10)...); - - ctx = Context(ps1) - job = @async collect(ctx, ts); - - # Will not be added, so they should never appear in output - ps2 = addprocs(2, exeflags="--project"); - push!(ps, ps2) - - ps3 = addprocs(2, exeflags="--project") - push!(ps, ps3) - @everywhere ps3 $setup - Dagger.addprocs!(ctx, ps3) - - while !istaskdone(job) - sleep(0.01) - if istaskstarted(job) + ps1 = addprocs(2, exeflags="--project"); + push!(ps, ps1) + + @everywhere vcat(ps1, myid()) $setup + + ts = delayed(vcat)((delayed(testfun)(i) for i in 1:10)...); + + ctx = Context(ps1) + job = @async collect(ctx, ts); + + while !istaskstarted(job) + sleep(0.001) + end + + # Will not be added, so they should never appear in output + ps2 = addprocs(2, exeflags="--project"); + push!(ps, ps2) + + ps3 = addprocs(2, exeflags="--project") + push!(ps, ps3) + @everywhere ps3 $setup + Dagger.addprocs!(ctx, ps3) + @test length(procs(ctx)) == 4 + + while !istaskdone(job) @everywhere ps1 notify(c) @everywhere ps3 notify(c) end + @test fetch(job) isa Vector + @test fetch(job) |> unique |> sort == vcat(ps1, ps3) + + finally + wait(rmprocs(ps)) end - @test fetch(job) isa Vector - @test fetch(job) |> unique |> sort == vcat(ps1, ps3) + end + + @testset "Remove workers" begin + ps = [] + try + ps1 = addprocs(4, exeflags="--project"); + push!(ps, ps1) + + @everywhere vcat(ps1, myid()) $setup + + ts = delayed(vcat)((delayed(testfun)(i) for i in 1:16)...); + + ctx = Context(ps1) + job = @async collect(ctx, ts); + + while !istaskstarted(job) + sleep(0.001) + end - finally - wait(rmprocs(ps)) + Dagger.rmprocs!(ctx, ps1[3:end]) + @test length(procs(ctx)) == 2 + + while !istaskdone(job) + @everywhere ps1 notify(c) + end + res = fetch(job) + @test res isa Vector + # First all four workers will report their IDs without hassle + # Then all four will be waiting for the Condition + # While they are waiting ps1[3:end] are removed, but when the Condition is notified they will finish their tasks before being removed + @test res[1:8] |> unique |> sort == ps1 + @test res[9:end] |> unique |> sort == ps1[1:2] + + finally + wait(rmprocs(ps)) + end end end end From ebefe0a3ef4578326dc6e6aa11deefe49b41a755 Mon Sep 17 00:00:00 2001 From: DrChainsaw Date: Thu, 1 Oct 2020 23:39:05 +0200 Subject: [PATCH 03/10] Add fault handler new proc to context --- src/fault-handler.jl | 3 +++ src/scheduler.jl | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/fault-handler.jl b/src/fault-handler.jl index f0996111d..eb688dbf7 100644 --- a/src/fault-handler.jl +++ b/src/fault-handler.jl @@ -124,6 +124,9 @@ function handle_fault(ctx, state, thunk, oldproc, chan, node_order) # Reschedule inputs from deadlist newproc = OSProc(rand(workers())) + if newproc ∉ procs(ctx) + addprocs!(ctx, [newproc]) + end while length(deadlist) > 0 dt = popfirst!(deadlist) if any((input in deadlist) for input in dt.inputs) diff --git a/src/scheduler.jl b/src/scheduler.jl index f28f70391..cf7eb04aa 100644 --- a/src/scheduler.jl +++ b/src/scheduler.jl @@ -3,7 +3,7 @@ module Sch using Distributed import MemPool: DRef -import ..Dagger: Context, Processor, Thunk, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, choose_processor, execute!, rmprocs! +import ..Dagger: Context, Processor, Thunk, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, choose_processor, execute!, rmprocs!, addprocs! include("fault-handler.jl") @@ -112,7 +112,7 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) end # Note: worker_state may be different things for different contexts. Don't touch it out here! - worker_state = assign_new_workers!(ctx ,ps, state, chan, node_order, worker_state) + worker_state = assign_new_workers!(ctx, ps, state, chan, node_order, worker_state) if isempty(state.running) # the block above fired only meta tasks From 5a3b0d0f93b23011d13d6ec15cf7dbde0ba301f2 Mon Sep 17 00:00:00 2001 From: DrChainsaw Date: Sat, 3 Oct 2020 20:09:00 +0200 Subject: [PATCH 04/10] Remove redunant semicolons and fix intendation --- test/scheduler.jl | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/test/scheduler.jl b/test/scheduler.jl index 2eb521d06..d0570e1a3 100644 --- a/test/scheduler.jl +++ b/test/scheduler.jl @@ -84,24 +84,24 @@ end end @testset "Add new workers" begin - ps = [] - try - ps1 = addprocs(2, exeflags="--project"); + ps = [] + try + ps1 = addprocs(2, exeflags="--project") push!(ps, ps1) @everywhere vcat(ps1, myid()) $setup - ts = delayed(vcat)((delayed(testfun)(i) for i in 1:10)...); + ts = delayed(vcat)((delayed(testfun)(i) for i in 1:10)...) ctx = Context(ps1) - job = @async collect(ctx, ts); + job = @async collect(ctx, ts) while !istaskstarted(job) sleep(0.001) end # Will not be added, so they should never appear in output - ps2 = addprocs(2, exeflags="--project"); + ps2 = addprocs(2, exeflags="--project") push!(ps, ps2) ps3 = addprocs(2, exeflags="--project") @@ -123,17 +123,17 @@ end end @testset "Remove workers" begin - ps = [] - try - ps1 = addprocs(4, exeflags="--project"); + ps = [] + try + ps1 = addprocs(4, exeflags="--project") push!(ps, ps1) @everywhere vcat(ps1, myid()) $setup - ts = delayed(vcat)((delayed(testfun)(i) for i in 1:16)...); + ts = delayed(vcat)((delayed(testfun)(i) for i in 1:16)...) ctx = Context(ps1) - job = @async collect(ctx, ts); + job = @async collect(ctx, ts) while !istaskstarted(job) sleep(0.001) From 517b0ae587d65c81516a5eb7ce84bfc713648ace Mon Sep 17 00:00:00 2001 From: DrChainsaw Date: Sat, 3 Oct 2020 20:17:46 +0200 Subject: [PATCH 05/10] Fix incorrect push! of array to array --- test/scheduler.jl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/scheduler.jl b/test/scheduler.jl index d0570e1a3..e6a85975a 100644 --- a/test/scheduler.jl +++ b/test/scheduler.jl @@ -87,7 +87,7 @@ end ps = [] try ps1 = addprocs(2, exeflags="--project") - push!(ps, ps1) + append!(ps, ps1) @everywhere vcat(ps1, myid()) $setup @@ -102,10 +102,10 @@ end # Will not be added, so they should never appear in output ps2 = addprocs(2, exeflags="--project") - push!(ps, ps2) + append!(ps, ps2) ps3 = addprocs(2, exeflags="--project") - push!(ps, ps3) + append!(ps, ps3) @everywhere ps3 $setup Dagger.addprocs!(ctx, ps3) @test length(procs(ctx)) == 4 @@ -126,7 +126,7 @@ end ps = [] try ps1 = addprocs(4, exeflags="--project") - push!(ps, ps1) + append!(ps, ps1) @everywhere vcat(ps1, myid()) $setup From 85cea093fdd9b37d51799dcfb9049cf7b041a3bf Mon Sep 17 00:00:00 2001 From: DrChainsaw Date: Sat, 3 Oct 2020 20:45:38 +0200 Subject: [PATCH 06/10] Use bool instead of condition --- test/scheduler.jl | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/test/scheduler.jl b/test/scheduler.jl index e6a85975a..571e21f10 100644 --- a/test/scheduler.jl +++ b/test/scheduler.jl @@ -68,18 +68,21 @@ end @testset "Modify workers in running job" begin # Test that we can add/remove workers while scheduler is running. - # As this requires asynchronity a Condition is used to stall the tasks to + # As this requires asynchronity a flag is used to stall the tasks to # ensure workers are actually modified while the scheduler is working setup = quote using Dagger, Distributed - # Condition to guarantee that processing is not completed before we add new workers - # Note: c is used in expressions below - c = Condition() + # blocked is to guarantee that processing is not completed before we add new workers + # Note: blocked is used in expressions below + blocked = true function testfun(i) - i < 4 && return myid() - wait(c) - return myid() + i < 4 && return myid() + # Wait for test to do its thing before we proceed + while blocked + sleep(0.001) + end + return myid() end end @@ -90,7 +93,7 @@ end append!(ps, ps1) @everywhere vcat(ps1, myid()) $setup - + ts = delayed(vcat)((delayed(testfun)(i) for i in 1:10)...) ctx = Context(ps1) @@ -110,10 +113,8 @@ end Dagger.addprocs!(ctx, ps3) @test length(procs(ctx)) == 4 - while !istaskdone(job) - @everywhere ps1 notify(c) - @everywhere ps3 notify(c) - end + @everywhere vcat(ps1, ps3) blocked=false + @test fetch(job) isa Vector @test fetch(job) |> unique |> sort == vcat(ps1, ps3) @@ -142,9 +143,8 @@ end Dagger.rmprocs!(ctx, ps1[3:end]) @test length(procs(ctx)) == 2 - while !istaskdone(job) - @everywhere ps1 notify(c) - end + @everywhere ps1 blocked=false + res = fetch(job) @test res isa Vector # First all four workers will report their IDs without hassle From 0cd584b06510d017ee4ce88f32890569a0cbda84 Mon Sep 17 00:00:00 2001 From: DrChainsaw Date: Sat, 3 Oct 2020 21:13:35 +0200 Subject: [PATCH 07/10] Export addprocs! and rmprocs! --- src/processor.jl | 2 +- test/scheduler.jl | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/processor.jl b/src/processor.jl index 3f1344714..70058f23f 100644 --- a/src/processor.jl +++ b/src/processor.jl @@ -1,4 +1,4 @@ -export OSProc, Context +export OSProc, Context, addprocs!, rmprocs! """ Processor diff --git a/test/scheduler.jl b/test/scheduler.jl index 571e21f10..9d8912b95 100644 --- a/test/scheduler.jl +++ b/test/scheduler.jl @@ -110,7 +110,7 @@ end ps3 = addprocs(2, exeflags="--project") append!(ps, ps3) @everywhere ps3 $setup - Dagger.addprocs!(ctx, ps3) + addprocs!(ctx, ps3) @test length(procs(ctx)) == 4 @everywhere vcat(ps1, ps3) blocked=false @@ -140,7 +140,7 @@ end sleep(0.001) end - Dagger.rmprocs!(ctx, ps1[3:end]) + rmprocs!(ctx, ps1[3:end]) @test length(procs(ctx)) == 2 @everywhere ps1 blocked=false @@ -151,7 +151,7 @@ end # Then all four will be waiting for the Condition # While they are waiting ps1[3:end] are removed, but when the Condition is notified they will finish their tasks before being removed @test res[1:8] |> unique |> sort == ps1 - @test res[9:end] |> unique |> sort == ps1[1:2] + @test all(pid -> pid in ps1[1:2], res[9:end]) finally wait(rmprocs(ps)) From 0e50a7905aa99553ec3b218a4ed234299cad5906 Mon Sep 17 00:00:00 2001 From: DrChainsaw Date: Sat, 3 Oct 2020 21:41:20 +0200 Subject: [PATCH 08/10] Add lock and defensive copy to procs(ctx) --- src/processor.jl | 4 +++- src/scheduler.jl | 55 +++++++++++++++++++++++------------------------ test/scheduler.jl | 1 + 3 files changed, 31 insertions(+), 29 deletions(-) diff --git a/src/processor.jl b/src/processor.jl index 70058f23f..bad64ad69 100644 --- a/src/processor.jl +++ b/src/processor.jl @@ -263,7 +263,9 @@ function Context() procs = [OSProc(w) for w in workers()] Context(procs) end -procs(ctx::Context) = ctx.procs +procs(ctx::Context) = lock(ctx) do + copy(ctx.procs) +end """ write_event(ctx::Context, event::Event) diff --git a/src/scheduler.jl b/src/scheduler.jl index cf7eb04aa..db8487ed0 100644 --- a/src/scheduler.jl +++ b/src/scheduler.jl @@ -82,12 +82,6 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) master = OSProc(myid()) @dbg timespan_start(ctx, :scheduler_init, 0, master) - if options.single !== 0 - @assert options.single in vcat(1, workers()) "Sch option 'single' must specify an active worker id" - ps = OSProc[OSProc(options.single)] - else - ps = procs(ctx) - end chan = Channel{Any}(32) deps = dependents(d) ord = order(d, noffspring(deps)) @@ -95,14 +89,14 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) node_order = x -> -get(ord, x, 0) state = start_state(deps, node_order) # start off some tasks - worker_state = assign_new_workers!(ctx, procs, state, chan, node_order) + procs_state = assign_new_procs!(ctx, state, chan, node_order) @dbg timespan_end(ctx, :scheduler_init, 0, master) # Loop while we still have thunks to execute while !isempty(state.ready) || !isempty(state.running) if isempty(state.running) && !isempty(state.ready) # Nothing running, so schedule up to N thunks, 1 per N workers - for p in ps + for p in procs_to_use(ctx) isempty(state.ready) && break task = pop_with_affinity!(ctx, state.ready, p, false) if task !== nothing @@ -112,7 +106,7 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) end # Note: worker_state may be different things for different contexts. Don't touch it out here! - worker_state = assign_new_workers!(ctx, ps, state, chan, node_order, worker_state) + procs_state = assign_new_procs!(ctx, state, chan, node_order, procs_state) if isempty(state.running) # the block above fired only meta tasks @@ -125,8 +119,8 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) @warn "Worker $(proc.pid) died on thunk $thunk_id, rescheduling work" # Remove dead worker from procs list + # Not sure what is desired behaviour if option.singleworker is set... rmprocs!(ctx, [proc]) - ps = procs(ctx) handle_fault(ctx, state, state.thunk_dict[thunk_id], proc, chan, node_order) continue @@ -140,8 +134,8 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) @dbg timespan_start(ctx, :scheduler, thunk_id, master) immediate_next = finish_task!(state, node, node_order) - if !isempty(state.ready) && !shall_remove_worker(ctx, proc, ps, immediate_next) - thunk = pop_with_affinity!(Context(ps), state.ready, proc, immediate_next) + if !isempty(state.ready) && !shall_remove_proc(ctx, proc, immediate_next) + thunk = pop_with_affinity!(Context(procs_to_use(ctx)), state.ready, proc, immediate_next) if thunk !== nothing fire_task!(ctx, thunk, proc, state, chan, node_order) end @@ -151,28 +145,33 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) state.cache[d] end -function assign_new_workers!(ctx, ps, state, chan, node_order, assignedprocs=[]) - ps !== procs(ctx) && return assignedprocs - lock(ctx) do - # Must track individual procs to handle the case when procs are removed - for p in setdiff(ps, assignedprocs) - isempty(state.ready) && break - task = pop_with_affinity!(ctx, state.ready, p, false) - if task !== nothing - fire_task!(ctx, task, p, state, chan, node_order) - end - end - return copy(ps) +procs_to_use(ctx) = procs_to_use(ctx, ctx.options) +function procs_to_use(ctx, options) + return if options.single !== 0 + @assert options.single in vcat(1, workers()) "Sch option 'single' must specify an active worker id" + OSProc[OSProc(options.single)] + else + procs(ctx) end end -function shall_remove_worker(ctx, proc, ps, immediate_next) - ps !== procs(ctx) && return false - return lock(ctx) do - proc ∉ procs(ctx) +# Main responsibility of this function is to check if new procs have been pushed to the context +function assign_new_procs!(ctx, state, chan, node_order, assignedprocs=[]) + ps = procs_to_use(ctx) + # Must track individual procs to handle the case when procs are removed + for p in setdiff(ps, assignedprocs) + isempty(state.ready) && break + task = pop_with_affinity!(ctx, state.ready, p, false) + if task !== nothing + fire_task!(ctx, task, p, state, chan, node_order) + end end + return ps end +# Might be a good policy to not remove the proc if immediate_next +shall_remove_proc(ctx, proc, immediate_next) = proc ∉ procs_to_use(ctx) + function pop_with_affinity!(ctx, tasks, proc, immediate_next) # allow JIT specialization on Pairs mapfirst(c) = first.(c) diff --git a/test/scheduler.jl b/test/scheduler.jl index 9d8912b95..134f43781 100644 --- a/test/scheduler.jl +++ b/test/scheduler.jl @@ -150,6 +150,7 @@ end # First all four workers will report their IDs without hassle # Then all four will be waiting for the Condition # While they are waiting ps1[3:end] are removed, but when the Condition is notified they will finish their tasks before being removed + # Will probably break if workers are assigned more than one Thunk @test res[1:8] |> unique |> sort == ps1 @test all(pid -> pid in ps1[1:2], res[9:end]) From 29bd4210d8c28e24f53a57231bd9fac174fec941 Mon Sep 17 00:00:00 2001 From: DrChainsaw Date: Sat, 3 Oct 2020 22:15:03 +0200 Subject: [PATCH 09/10] Add background task to assign new workers --- src/scheduler.jl | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/scheduler.jl b/src/scheduler.jl index db8487ed0..134602e50 100644 --- a/src/scheduler.jl +++ b/src/scheduler.jl @@ -89,9 +89,20 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) node_order = x -> -get(ord, x, 0) state = start_state(deps, node_order) # start off some tasks + # Note: worker_state may be different things for different contexts. Don't touch it out here! procs_state = assign_new_procs!(ctx, state, chan, node_order) @dbg timespan_end(ctx, :scheduler_init, 0, master) + # Check periodically for new workers in a parallel task so that we don't accidentally end up + # having to wait for 'take!(chan)' on some large task before new workers are put to work + newprocs_lock = ReentrantLock() + @async while !isempty(state.ready) || !isempty(state.running) + sleep(1) + procs_state = lock(newprocs_lock) do + assign_new_procs!(ctx, state, chan, node_order, procs_state) + end + end + # Loop while we still have thunks to execute while !isempty(state.ready) || !isempty(state.running) if isempty(state.running) && !isempty(state.ready) @@ -105,8 +116,9 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) end end - # Note: worker_state may be different things for different contexts. Don't touch it out here! - procs_state = assign_new_procs!(ctx, state, chan, node_order, procs_state) + procs_state = lock(newprocs_lock) do + assign_new_procs!(ctx, state, chan, node_order, procs_state) + end if isempty(state.running) # the block above fired only meta tasks From 681795a1e32c24f5afa9d8cf6b4bc8e4b663df95 Mon Sep 17 00:00:00 2001 From: DrChainsaw Date: Sat, 3 Oct 2020 22:31:22 +0200 Subject: [PATCH 10/10] Add section about Dynamic worker pools --- docs/src/processors.md | 45 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/docs/src/processors.md b/docs/src/processors.md index 470ac9865..a50fd4469 100644 --- a/docs/src/processors.md +++ b/docs/src/processors.md @@ -70,3 +70,48 @@ complicated detection and recovery process, including multiple master processes, a distributed and replicated database such as etcd, and checkpointing of the scheduler to ensure an efficient recovery. Such a system does not yet exist, but contributions for such a change are desired. + +## Dynamic worker pools + +Daggers default scheduler supports modifying the worker pool while the +scheduler is running. This is done by modifying the `Processor`s of the +`Context` supplied to the scheduler at initialization using +`addprocs!(ctx, ps)` and `rmprocs(ctx, ps)` where `ps` can be `Processors` or + just process ids. + +An example of when this is useful is in HPC environments where individual +jobs to start up workers are queued so that not all workers are guaranteed to +be available at the same time. + +New workers will typically be assigned new tasks as soon as the scheduler +sees them. Removed workers will finish all their assigned tasks but will not +be assigned any new tasks. Note that this makes it difficult to determine when +a worker is no longer in use by Dagger. + +Example: + +```julia +using Distributed + +ps1 = addprocs(2, exeflags="--project"); +@everywhere using Distributed, Dagger + +# Dummy task to wait for 0.5 seconds and then return the id of the worker +ts = delayed(vcat)((delayed(i -> (sleep(0.5); myid()))(i) for i in 1:20)...); + +ctx = Context(); +# Scheduler is blocking, so we need a new task to add workers while it runs +job = @async collect(ctx, ts); + +# Lets fire up some new workers +ps2 = addprocs(2, exeflags="--project"); +@everywhere using Distributed, Dagger +# New workers are not available until we do this +addprocs!(ctx, ps2) + +# Lets hope the job didn't complete before workers were added :) +fetch(job) |> unique + +# and cleanup after ourselves... +workers() |> rmprocs +```