Skip to content

Commit

Permalink
Add option to remove workers from context
Browse files Browse the repository at this point in the history
  • Loading branch information
DrChainsaw committed Oct 1, 2020
1 parent 65d8073 commit 5e67f08
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 41 deletions.
20 changes: 17 additions & 3 deletions src/processor.jl
Original file line number Diff line number Diff line change
Expand Up @@ -284,13 +284,27 @@ Base.lock(f, ctx::Context) = lock(f, ctx.proc_lock)
"""
addprocs!(ctx::Context, xs)
Add new workers `xs` to an existing `Context ctx`.
Add new workers `xs` to `ctx`.
Workers will typically be assigned new tasks in the next scheduling iteration.
Workers will typically be assigned new tasks in the next scheduling iteration if scheduling is ongoing.
Workers can be either `Processors` or the underlying process ids as `Integer`s.
Workers can be either `Processor`s or the underlying process ids as `Integer`s.
"""
addprocs!(ctx::Context, xs::AbstractVector{<:Integer}) = addprocs!(ctx, map(OSProc, xs))
addprocs!(ctx::Context, xs::AbstractVector{<:Processor}) = lock(ctx) do
append!(ctx.procs, xs)
end
"""
rmprocs!(ctx::Context, xs)
Remove the specified workers `xs` from `ctx`.
Workers will typically finish all their assigned tasks if scheduling is ongoing but will not be assigned new tasks after removal.
Workers can be either `Processors` or the underlying process ids as `Integer`s.
"""
rmprocs!(ctx::Context, xs::AbstractVector{<:Integer}) = rmprocs!(ctx, map(OSProc, xs))
rmprocs!(ctx::Context, xs::AbstractVector{<:Processor}) = lock(ctx) do
filter!(p -> p xs, ctx.procs)
end

22 changes: 15 additions & 7 deletions src/scheduler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module Sch
using Distributed
import MemPool: DRef

import ..Dagger: Context, Processor, Thunk, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, choose_processor, execute!
import ..Dagger: Context, Processor, Thunk, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, choose_processor, execute!, rmprocs!

include("fault-handler.jl")

Expand Down Expand Up @@ -125,7 +125,7 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
@warn "Worker $(proc.pid) died on thunk $thunk_id, rescheduling work"

# Remove dead worker from procs list
filter!(p->p.pid!=proc.pid, ctx.procs)
rmprocs!(ctx, [proc])
ps = procs(ctx)

handle_fault(ctx, state, state.thunk_dict[thunk_id], proc, chan, node_order)
Expand All @@ -140,7 +140,7 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())

@dbg timespan_start(ctx, :scheduler, thunk_id, master)
immediate_next = finish_task!(state, node, node_order)
if !isempty(state.ready)
if !isempty(state.ready) && !shall_remove_worker(ctx, proc, ps, immediate_next)
thunk = pop_with_affinity!(Context(ps), state.ready, proc, immediate_next)
if thunk !== nothing
fire_task!(ctx, thunk, proc, state, chan, node_order)
Expand All @@ -151,17 +151,25 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
state.cache[d]
end

function assign_new_workers!(ctx, ps, state, chan, node_order, nadded=0)
ps != procs(ctx) && return nadded
function assign_new_workers!(ctx, ps, state, chan, node_order, assignedprocs=[])
ps !== procs(ctx) && return assignedprocs
lock(ctx) do
for p in ps[1+nadded:end]
# Must track individual procs to handle the case when procs are removed
for p in setdiff(ps, assignedprocs)
isempty(state.ready) && break
task = pop_with_affinity!(ctx, state.ready, p, false)
if task !== nothing
fire_task!(ctx, task, p, state, chan, node_order)
end
end
return length(ps)
return copy(ps)
end
end

function shall_remove_worker(ctx, proc, ps, immediate_next)
ps !== procs(ctx) && return false
return lock(ctx) do
proc procs(ctx)
end
end

Expand Down
15 changes: 15 additions & 0 deletions test/processors.jl
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,19 @@ end
@everywhere pop!(Dagger.PROCESSOR_CALLBACKS)
end
end

@testset "Modify workers in Context" begin
ps = addprocs(4, exeflags="--project")
@everywhere ps using Dagger

ctx = Context(ps[1:2])

Dagger.addprocs!(ctx, ps[3:end])
@test map(p -> p.pid, procs(ctx)) == ps

Dagger.rmprocs!(ctx, ps[3:end])
@test map(p -> p.pid, procs(ctx)) == ps[1:2]

wait(rmprocs(ps))
end
end
102 changes: 71 additions & 31 deletions test/scheduler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ end
end
@everywhere (pop!(Dagger.PROCESSOR_CALLBACKS); empty!(Dagger.OSPROC_CACHE))

@testset "Add new workers" begin
# Test that we can add new workers to an ongoing task.
@testset "Modify workers in running job" begin
# Test that we can add/remove workers while scheduler is running.
# As this requires asynchronity a Condition is used to stall the tasks to
# ensure workers are actually added while the scheduler is working
using Distributed

# ensure workers are actually modified while the scheduler is working

setup = quote
using Dagger, Distributed
# Condition to guarantee that processing is not completed before we add new workers
# Note: c is used in expressions below
c = Condition()
function testfun(i)
i < 4 && return myid()
Expand All @@ -83,39 +83,79 @@ end
end
end

@testset "Add new workers" begin
ps = []
try
ps1 = addprocs(2, exeflags="--project");
push!(ps, ps1)

@everywhere $setup

ts = delayed(vcat)((delayed(testfun)(i) for i in 1:10)...);

ctx = Context(ps1)
job = @async collect(ctx, ts);

# Will not be added, so they should never appear in output
ps2 = addprocs(2, exeflags="--project");
push!(ps, ps2)

ps3 = addprocs(2, exeflags="--project")
push!(ps, ps3)
@everywhere ps3 $setup
Dagger.addprocs!(ctx, ps3)

while !istaskdone(job)
sleep(0.01)
if istaskstarted(job)
ps1 = addprocs(2, exeflags="--project");
push!(ps, ps1)

@everywhere vcat(ps1, myid()) $setup

ts = delayed(vcat)((delayed(testfun)(i) for i in 1:10)...);

ctx = Context(ps1)
job = @async collect(ctx, ts);

while !istaskstarted(job)
sleep(0.001)
end

# Will not be added, so they should never appear in output
ps2 = addprocs(2, exeflags="--project");
push!(ps, ps2)

ps3 = addprocs(2, exeflags="--project")
push!(ps, ps3)
@everywhere ps3 $setup
Dagger.addprocs!(ctx, ps3)
@test length(procs(ctx)) == 4

while !istaskdone(job)
@everywhere ps1 notify(c)
@everywhere ps3 notify(c)
end
@test fetch(job) isa Vector
@test fetch(job) |> unique |> sort == vcat(ps1, ps3)

finally
wait(rmprocs(ps))
end
@test fetch(job) isa Vector
@test fetch(job) |> unique |> sort == vcat(ps1, ps3)
end

@testset "Remove workers" begin
ps = []
try
ps1 = addprocs(4, exeflags="--project");
push!(ps, ps1)

@everywhere vcat(ps1, myid()) $setup

ts = delayed(vcat)((delayed(testfun)(i) for i in 1:16)...);

ctx = Context(ps1)
job = @async collect(ctx, ts);

while !istaskstarted(job)
sleep(0.001)
end

finally
wait(rmprocs(ps))
Dagger.rmprocs!(ctx, ps1[3:end])
@test length(procs(ctx)) == 2

while !istaskdone(job)
@everywhere ps1 notify(c)
end
res = fetch(job)
@test res isa Vector
# First all four workers will report their IDs without hassle
# Then all four will be waiting for the Condition
# While they are waiting ps1[3:end] are removed, but when the Condition is notified they will finish their tasks before being removed
@test res[1:8] |> unique |> sort == ps1
@test res[9:end] |> unique |> sort == ps1[1:2]

finally
wait(rmprocs(ps))
end
end
end
end

0 comments on commit 5e67f08

Please sign in to comment.