Skip to content

Commit

Permalink
Merge 681795a into accb8ad
Browse files Browse the repository at this point in the history
  • Loading branch information
DrChainsaw committed Oct 3, 2020
2 parents accb8ad + 681795a commit 9aaba46
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 40 deletions.
45 changes: 45 additions & 0 deletions docs/src/processors.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,48 @@ complicated detection and recovery process, including multiple master
processes, a distributed and replicated database such as etcd, and
checkpointing of the scheduler to ensure an efficient recovery. Such a system
does not yet exist, but contributions for such a change are desired.

## Dynamic worker pools

Daggers default scheduler supports modifying the worker pool while the
scheduler is running. This is done by modifying the `Processor`s of the
`Context` supplied to the scheduler at initialization using
`addprocs!(ctx, ps)` and `rmprocs(ctx, ps)` where `ps` can be `Processors` or
just process ids.

An example of when this is useful is in HPC environments where individual
jobs to start up workers are queued so that not all workers are guaranteed to
be available at the same time.

New workers will typically be assigned new tasks as soon as the scheduler
sees them. Removed workers will finish all their assigned tasks but will not
be assigned any new tasks. Note that this makes it difficult to determine when
a worker is no longer in use by Dagger.

Example:

```julia
using Distributed

ps1 = addprocs(2, exeflags="--project");
@everywhere using Distributed, Dagger

# Dummy task to wait for 0.5 seconds and then return the id of the worker
ts = delayed(vcat)((delayed(i -> (sleep(0.5); myid()))(i) for i in 1:20)...);

ctx = Context();
# Scheduler is blocking, so we need a new task to add workers while it runs
job = @async collect(ctx, ts);

# Lets fire up some new workers
ps2 = addprocs(2, exeflags="--project");
@everywhere using Distributed, Dagger
# New workers are not available until we do this
addprocs!(ctx, ps2)

# Lets hope the job didn't complete before workers were added :)
fetch(job) |> unique

# and cleanup after ourselves...
workers() |> rmprocs
```
3 changes: 3 additions & 0 deletions src/fault-handler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ function handle_fault(ctx, state, thunk, oldproc, chan, node_order)

# Reschedule inputs from deadlist
newproc = OSProc(rand(workers()))
if newproc procs(ctx)
addprocs!(ctx, [newproc])
end
while length(deadlist) > 0
dt = popfirst!(deadlist)
if any((input in deadlist) for input in dt.inputs)
Expand Down
43 changes: 41 additions & 2 deletions src/processor.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export OSProc, Context
export OSProc, Context, addprocs!, rmprocs!

"""
Processor
Expand Down Expand Up @@ -238,6 +238,7 @@ mutable struct Context
log_sink::Any
profile::Bool
options
proc_lock::ReentrantLock
end

"""
Expand All @@ -256,12 +257,15 @@ as a `Vector{Int}`.
function Context(xs)
Context(xs, NoOpLog(), false, nothing) # By default don't log events
end
Context(xs, log_sink, profile, options) = Context(xs, log_sink, profile, options, ReentrantLock())
Context(xs::Vector{Int}) = Context(map(OSProc, xs))
function Context()
procs = [OSProc(w) for w in workers()]
Context(procs)
end
procs(ctx::Context) = ctx.procs
procs(ctx::Context) = lock(ctx) do
copy(ctx.procs)
end

"""
write_event(ctx::Context, event::Event)
Expand All @@ -271,3 +275,38 @@ Write a log event
function write_event(ctx::Context, event::Event)
write_event(ctx.log_sink, event)
end

"""
lock(f, ctx::Context)
Acquire `ctx.proc_lock`, execute `f` with the lock held, and release the lock when `f` returns.
"""
Base.lock(f, ctx::Context) = lock(f, ctx.proc_lock)

"""
addprocs!(ctx::Context, xs)
Add new workers `xs` to `ctx`.
Workers will typically be assigned new tasks in the next scheduling iteration if scheduling is ongoing.
Workers can be either `Processor`s or the underlying process ids as `Integer`s.
"""
addprocs!(ctx::Context, xs::AbstractVector{<:Integer}) = addprocs!(ctx, map(OSProc, xs))
addprocs!(ctx::Context, xs::AbstractVector{<:Processor}) = lock(ctx) do
append!(ctx.procs, xs)
end
"""
rmprocs!(ctx::Context, xs)
Remove the specified workers `xs` from `ctx`.
Workers will typically finish all their assigned tasks if scheduling is ongoing but will not be assigned new tasks after removal.
Workers can be either `Processors` or the underlying process ids as `Integer`s.
"""
rmprocs!(ctx::Context, xs::AbstractVector{<:Integer}) = rmprocs!(ctx, map(OSProc, xs))
rmprocs!(ctx::Context, xs::AbstractVector{<:Processor}) = lock(ctx) do
filter!(p -> p xs, ctx.procs)
end

66 changes: 48 additions & 18 deletions src/scheduler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module Sch
using Distributed
import MemPool: DRef

import ..Dagger: Context, Processor, Thunk, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, choose_processor, execute!
import ..Dagger: Context, Processor, Thunk, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, choose_processor, execute!, rmprocs!, addprocs!

include("fault-handler.jl")

Expand Down Expand Up @@ -82,33 +82,32 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
master = OSProc(myid())
@dbg timespan_start(ctx, :scheduler_init, 0, master)

if options.single !== 0
@assert options.single in vcat(1, workers()) "Sch option 'single' must specify an active worker id"
ps = OSProc[OSProc(options.single)]
else
ps = procs(ctx)
end
chan = Channel{Any}(32)
deps = dependents(d)
ord = order(d, noffspring(deps))

node_order = x -> -get(ord, x, 0)
state = start_state(deps, node_order)
# start off some tasks
for p in ps
isempty(state.ready) && break
task = pop_with_affinity!(ctx, state.ready, p, false)
if task !== nothing
fire_task!(ctx, task, p, state, chan, node_order)
# Note: worker_state may be different things for different contexts. Don't touch it out here!
procs_state = assign_new_procs!(ctx, state, chan, node_order)
@dbg timespan_end(ctx, :scheduler_init, 0, master)

# Check periodically for new workers in a parallel task so that we don't accidentally end up
# having to wait for 'take!(chan)' on some large task before new workers are put to work
newprocs_lock = ReentrantLock()
@async while !isempty(state.ready) || !isempty(state.running)
sleep(1)
procs_state = lock(newprocs_lock) do
assign_new_procs!(ctx, state, chan, node_order, procs_state)
end
end
@dbg timespan_end(ctx, :scheduler_init, 0, master)

# Loop while we still have thunks to execute
while !isempty(state.ready) || !isempty(state.running)
if isempty(state.running) && !isempty(state.ready)
# Nothing running, so schedule up to N thunks, 1 per N workers
for p in ps
for p in procs_to_use(ctx)
isempty(state.ready) && break
task = pop_with_affinity!(ctx, state.ready, p, false)
if task !== nothing
Expand All @@ -117,6 +116,10 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
end
end

procs_state = lock(newprocs_lock) do
assign_new_procs!(ctx, state, chan, node_order, procs_state)
end

if isempty(state.running)
# the block above fired only meta tasks
continue
Expand All @@ -128,8 +131,8 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
@warn "Worker $(proc.pid) died on thunk $thunk_id, rescheduling work"

# Remove dead worker from procs list
filter!(p->p.pid!=proc.pid, ctx.procs)
ps = procs(ctx)
# Not sure what is desired behaviour if option.singleworker is set...
rmprocs!(ctx, [proc])

handle_fault(ctx, state, state.thunk_dict[thunk_id], proc, chan, node_order)
continue
Expand All @@ -143,8 +146,8 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())

@dbg timespan_start(ctx, :scheduler, thunk_id, master)
immediate_next = finish_task!(state, node, node_order)
if !isempty(state.ready)
thunk = pop_with_affinity!(Context(ps), state.ready, proc, immediate_next)
if !isempty(state.ready) && !shall_remove_proc(ctx, proc, immediate_next)
thunk = pop_with_affinity!(Context(procs_to_use(ctx)), state.ready, proc, immediate_next)
if thunk !== nothing
fire_task!(ctx, thunk, proc, state, chan, node_order)
end
Expand All @@ -154,6 +157,33 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
state.cache[d]
end

procs_to_use(ctx) = procs_to_use(ctx, ctx.options)
function procs_to_use(ctx, options)
return if options.single !== 0
@assert options.single in vcat(1, workers()) "Sch option 'single' must specify an active worker id"
OSProc[OSProc(options.single)]
else
procs(ctx)
end
end

# Main responsibility of this function is to check if new procs have been pushed to the context
function assign_new_procs!(ctx, state, chan, node_order, assignedprocs=[])
ps = procs_to_use(ctx)
# Must track individual procs to handle the case when procs are removed
for p in setdiff(ps, assignedprocs)
isempty(state.ready) && break
task = pop_with_affinity!(ctx, state.ready, p, false)
if task !== nothing
fire_task!(ctx, task, p, state, chan, node_order)
end
end
return ps
end

# Might be a good policy to not remove the proc if immediate_next
shall_remove_proc(ctx, proc, immediate_next) = proc procs_to_use(ctx)

function pop_with_affinity!(ctx, tasks, proc, immediate_next)
# allow JIT specialization on Pairs
mapfirst(c) = first.(c)
Expand Down
15 changes: 15 additions & 0 deletions test/processors.jl
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,19 @@ end
@everywhere pop!(Dagger.PROCESSOR_CALLBACKS)
end
end

@testset "Modify workers in Context" begin
ps = addprocs(4, exeflags="--project")
@everywhere ps using Dagger

ctx = Context(ps[1:2])

Dagger.addprocs!(ctx, ps[3:end])
@test map(p -> p.pid, procs(ctx)) == ps

Dagger.rmprocs!(ctx, ps[3:end])
@test map(p -> p.pid, procs(ctx)) == ps[1:2]

wait(rmprocs(ps))
end
end
106 changes: 86 additions & 20 deletions test/scheduler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -66,31 +66,97 @@ end
end
@everywhere (pop!(Dagger.PROCESSOR_CALLBACKS); empty!(Dagger.OSPROC_CACHE))

@testset "Add new workers" begin
using Distributed
ps1 = addprocs(2, exeflags="--project");

@everywhere begin
@testset "Modify workers in running job" begin
# Test that we can add/remove workers while scheduler is running.
# As this requires asynchronity a flag is used to stall the tasks to
# ensure workers are actually modified while the scheduler is working

setup = quote
using Dagger, Distributed
# Condition to guarantee that processing is not completed before we add new workers
c = Condition()
# blocked is to guarantee that processing is not completed before we add new workers
# Note: blocked is used in expressions below
blocked = true
function testfun(i)
i < 2 && return myid()
wait(c)
return myid()
i < 4 && return myid()
# Wait for test to do its thing before we proceed
while blocked
sleep(0.001)
end
return myid()
end
end

ts = delayed(vcat)((delayed(testfun)(i) for i in 1:4)...);
job = @async collect(Context(ps1), ts);

ps2 = addprocs(2, exeflags="--project");

while !istaskdone(job)
@everywhere ps1 notify(c)

@testset "Add new workers" begin
ps = []
try
ps1 = addprocs(2, exeflags="--project")
append!(ps, ps1)

@everywhere vcat(ps1, myid()) $setup

ts = delayed(vcat)((delayed(testfun)(i) for i in 1:10)...)

ctx = Context(ps1)
job = @async collect(ctx, ts)

while !istaskstarted(job)
sleep(0.001)
end

# Will not be added, so they should never appear in output
ps2 = addprocs(2, exeflags="--project")
append!(ps, ps2)

ps3 = addprocs(2, exeflags="--project")
append!(ps, ps3)
@everywhere ps3 $setup
addprocs!(ctx, ps3)
@test length(procs(ctx)) == 4

@everywhere vcat(ps1, ps3) blocked=false

@test fetch(job) isa Vector
@test fetch(job) |> unique |> sort == vcat(ps1, ps3)

finally
wait(rmprocs(ps))
end
end
@test fetch(job) |> unique |> sort == ps1

wait(rmprocs(vcat(ps1,ps2)))
@testset "Remove workers" begin
ps = []
try
ps1 = addprocs(4, exeflags="--project")
append!(ps, ps1)

@everywhere vcat(ps1, myid()) $setup

ts = delayed(vcat)((delayed(testfun)(i) for i in 1:16)...)

ctx = Context(ps1)
job = @async collect(ctx, ts)

while !istaskstarted(job)
sleep(0.001)
end

rmprocs!(ctx, ps1[3:end])
@test length(procs(ctx)) == 2

@everywhere ps1 blocked=false

res = fetch(job)
@test res isa Vector
# First all four workers will report their IDs without hassle
# Then all four will be waiting for the Condition
# While they are waiting ps1[3:end] are removed, but when the Condition is notified they will finish their tasks before being removed
# Will probably break if workers are assigned more than one Thunk
@test res[1:8] |> unique |> sort == ps1
@test all(pid -> pid in ps1[1:2], res[9:end])

finally
wait(rmprocs(ps))
end
end
end
end

0 comments on commit 9aaba46

Please sign in to comment.