diff --git a/src/fault-handler.jl b/src/fault-handler.jl index c74004d6..d2d35b54 100644 --- a/src/fault-handler.jl +++ b/src/fault-handler.jl @@ -124,7 +124,7 @@ function handle_fault(ctx, state, thunk, oldproc, chan, node_order) # Reschedule inputs from deadlist ps = procs(ctx) - @assert !isempty(ps) "No processes left!" + @assert !isempty(ps) "No workers left for fault handling!" newproc = rand(ps) while length(deadlist) > 0 diff --git a/src/processor.jl b/src/processor.jl index bad64ad6..eda92570 100644 --- a/src/processor.jl +++ b/src/processor.jl @@ -286,16 +286,17 @@ Base.lock(f, ctx::Context) = lock(f, ctx.proc_lock) """ addprocs!(ctx::Context, xs) -Add new workers `xs` to `ctx`. +Add new workers `xs` to `ctx`. Workers will typically be assigned new tasks in the next scheduling iteration if scheduling is ongoing. -Workers can be either `Processor`s or the underlying process ids as `Integer`s. +Workers can be either `Processor`s or the underlying process IDs as `Integer`s. """ addprocs!(ctx::Context, xs::AbstractVector{<:Integer}) = addprocs!(ctx, map(OSProc, xs)) -addprocs!(ctx::Context, xs::AbstractVector{<:Processor}) = lock(ctx) do +addprocs!(ctx::Context, xs::AbstractVector{<:Processor}) = lock(ctx) do append!(ctx.procs, xs) end + """ rmprocs!(ctx::Context, xs) @@ -303,10 +304,9 @@ Remove the specified workers `xs` from `ctx`. Workers will typically finish all their assigned tasks if scheduling is ongoing but will not be assigned new tasks after removal. -Workers can be either `Processors` or the underlying process ids as `Integer`s. +Workers can be either `Processor`s or the underlying process IDs as `Integer`s. """ rmprocs!(ctx::Context, xs::AbstractVector{<:Integer}) = rmprocs!(ctx, map(OSProc, xs)) -rmprocs!(ctx::Context, xs::AbstractVector{<:Processor}) = lock(ctx) do +rmprocs!(ctx::Context, xs::AbstractVector{<:Processor}) = lock(ctx) do filter!(p -> p ∉ xs, ctx.procs) end - diff --git a/src/scheduler.jl b/src/scheduler.jl index 8f602465..efbb6e7e 100644 --- a/src/scheduler.jl +++ b/src/scheduler.jl @@ -106,21 +106,24 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) end end - # This is a bit redundant as the @async task below does basically the same job - # Without it though, testing of process modification becomes non-deterministic - # (due to sleep in CI environment) which is why it is still here. + # This is a bit redundant as the @async task below does basically the + # same job Without it though, testing of process modification becomes + # non-deterministic (due to sleep in CI environment) which is why it is + # still here. procs_state = assign_new_procs!(ctx, state, chan, node_order, procs_state) - + if isempty(state.running) # the block above fired only meta tasks continue end - check_integrity(ctx) - # Check periodically for new workers in a parallel task so that we don't accidentally end up - # having to wait for 'take!(chan)' on some large task before new workers are put to work - # Lock is used to stop this task as soon as something pops out from the channel to minimize - # risk that the task schedules thunks simultaneously as the main task (after future refactoring). + + # Check periodically for new workers in a parallel task so that we + # don't accidentally end up having to wait for `take!(chan)` on some + # large task before new workers are put to work. Locking is used to + # stop this task as soon as something pops out from the channel to + # minimize risk that the task schedules thunks simultaneously as the + # main task (after future refactoring). newtasks_lock = ReentrantLock() @async while !isempty(state.ready) || !isempty(state.running) sleep(1) @@ -152,7 +155,7 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) @dbg timespan_start(ctx, :scheduler, thunk_id, master) immediate_next = finish_task!(state, node, node_order) - if !isempty(state.ready) && !shall_remove_proc(ctx, proc, immediate_next) + if !isempty(state.ready) && !shall_remove_proc(ctx, proc) thunk = pop_with_affinity!(Context(procs_to_use(ctx)), state.ready, proc, immediate_next) if thunk !== nothing fire_task!(ctx, thunk, proc, state, chan, node_order) @@ -163,18 +166,16 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) state.cache[d] end -procs_to_use(ctx) = procs_to_use(ctx, ctx.options) -function procs_to_use(ctx, options) +function procs_to_use(ctx, options=ctx.options) return if options.single !== 0 - @assert options.single in vcat(1, workers()) "Sch option 'single' must specify an active worker id" + @assert options.single in vcat(1, workers()) "Sch option `single` must specify an active worker ID." OSProc[OSProc(options.single)] else procs(ctx) end end -check_integrity(ctx) = check_integrity(ctx, ctx.options) -check_integrity(ctx, ::Any) = @assert !isempty(procs_to_use(ctx)) "No workers available!!" +check_integrity(ctx) = @assert !isempty(procs_to_use(ctx)) "No suitable workers available in context." # Main responsibility of this function is to check if new procs have been pushed to the context function assign_new_procs!(ctx, state, chan, node_order, assignedprocs=[]) @@ -191,11 +192,10 @@ function assign_new_procs!(ctx, state, chan, node_order, assignedprocs=[]) end # Might be a good policy to not remove the proc if immediate_next -shall_remove_proc(ctx, proc, immediate_next) = proc ∉ procs_to_use(ctx) +shall_remove_proc(ctx, proc) = proc ∉ procs_to_use(ctx) -remove_dead_proc!(ctx, proc) = remove_dead_proc!(ctx, ctx.options, proc) -function remove_dead_proc!(ctx, options, proc) - @assert options.single !== proc.pid "Single worker failed!" +function remove_dead_proc!(ctx, proc, options=ctx.options) + @assert options.single !== proc.pid "Single worker failed, cannot continue." rmprocs!(ctx, [proc]) end diff --git a/test/processors.jl b/test/processors.jl index a3b7d2da..f868f1ea 100644 --- a/test/processors.jl +++ b/test/processors.jl @@ -80,7 +80,7 @@ end @testset "Modify workers in Context" begin ps = addprocs(4, exeflags="--project") @everywhere ps using Dagger - + ctx = Context(ps[1:2]) Dagger.addprocs!(ctx, ps[3:end]) diff --git a/test/scheduler.jl b/test/scheduler.jl index 70df5862..b628b67f 100644 --- a/test/scheduler.jl +++ b/test/scheduler.jl @@ -68,41 +68,41 @@ end @testset "Modify workers in running job" begin # Test that we can add/remove workers while scheduler is running. - # As this requires asynchronity a flag is used to stall the tasks to - # ensure workers are actually modified while the scheduler is working + # As this requires asynchronity, a flag is used to stall the tasks to + # ensure workers are actually modified while the scheduler is working. setup = quote - using Dagger, Distributed - # blocked is to guarantee that processing is not completed before we add new workers - # Note: blocked is used in expressions below - blocked = true - function testfun(i) + using Dagger, Distributed + # blocked is to guarantee that processing is not completed before we add new workers + # Note: blocked is used in expressions below + blocked = true + function testfun(i) i < 4 && return myid() # Wait for test to do its thing before we proceed while blocked sleep(0.001) end return myid() - end + end end @testset "Add new workers" begin ps = [] - try + try ps1 = addprocs(2, exeflags="--project") append!(ps, ps1) @everywhere vcat(ps1, myid()) $setup - + ts = delayed(vcat)((delayed(testfun)(i) for i in 1:10)...) ctx = Context(ps1) job = @async collect(ctx, ts) - while !istaskstarted(job) + while !istaskstarted(job) sleep(0.001) end - + # Will not be added, so they should never appear in output ps2 = addprocs(2, exeflags="--project") append!(ps, ps2) @@ -112,12 +112,11 @@ end @everywhere ps3 $setup addprocs!(ctx, ps3) @test length(procs(ctx)) == 4 - + @everywhere vcat(ps1, ps3) blocked=false - + @test fetch(job) isa Vector @test fetch(job) |> unique |> sort == vcat(ps1, ps3) - finally wait(rmprocs(ps)) end @@ -125,18 +124,18 @@ end @testset "Remove workers" begin ps = [] - try + try ps1 = addprocs(4, exeflags="--project") append!(ps, ps1) @everywhere vcat(ps1, myid()) $setup - + ts = delayed(vcat)((delayed(testfun)(i) for i in 1:16)...) ctx = Context(ps1) job = @async collect(ctx, ts) - while !istaskstarted(job) + while !istaskstarted(job) sleep(0.001) end @@ -144,16 +143,16 @@ end @test length(procs(ctx)) == 2 @everywhere ps1 blocked=false - + res = fetch(job) @test res isa Vector # First all four workers will report their IDs without hassle - # Then all four will be waiting for the Condition - # While they are waiting ps1[3:end] are removed, but when the Condition is notified they will finish their tasks before being removed - # Will probably break if workers are assigned more than one Thunk + # Then all four will be waiting for the Condition While they + # are waiting ps1[3:end] are removed, but when the Condition is + # notified they will finish their tasks before being removed + # Will probably break if workers are assigned more than one Thunk @test res[1:8] |> unique |> sort == ps1 @test all(pid -> pid in ps1[1:2], res[9:end]) - finally wait(rmprocs(ps)) end @@ -161,18 +160,18 @@ end @testset "Remove all workers throws" begin ps = [] - try + try ps1 = addprocs(2, exeflags="--project") append!(ps, ps1) @everywhere vcat(ps1, myid()) $setup - + ts = delayed(vcat)((delayed(testfun)(i) for i in 1:16)...) ctx = Context(ps1) job = @async collect(ctx, ts) - while !istaskstarted(job) + while !istaskstarted(job) sleep(0.001) end