Skip to content

Commit

Permalink
Merge e4efe56 into accb8ad
Browse files Browse the repository at this point in the history
  • Loading branch information
DrChainsaw committed Oct 5, 2020
2 parents accb8ad + e4efe56 commit d2ebf3b
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 50 deletions.
46 changes: 46 additions & 0 deletions docs/src/processors.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,49 @@ complicated detection and recovery process, including multiple master
processes, a distributed and replicated database such as etcd, and
checkpointing of the scheduler to ensure an efficient recovery. Such a system
does not yet exist, but contributions for such a change are desired.

## Dynamic worker pools

Daggers default scheduler supports modifying the worker pool while the
scheduler is running. This is done by modifying the `Processor`s of the
`Context` supplied to the scheduler at initialization using
`addprocs!(ctx, ps)` and `rmprocs(ctx, ps)` where `ps` can be `Processors` or
just process ids.

An example of when this is useful is in HPC environments where individual
jobs to start up workers are queued so that not all workers are guaranteed to
be available at the same time.

New workers will typically be assigned new tasks as soon as the scheduler
sees them. Removed workers will finish all their assigned tasks but will not
be assigned any new tasks. Note that this makes it difficult to determine when
a worker is no longer in use by Dagger. Contributions to alleviate this
uncertainty are welcome!

Example:

```julia
using Distributed

ps1 = addprocs(2, exeflags="--project");
@everywhere using Distributed, Dagger

# Dummy task to wait for 0.5 seconds and then return the id of the worker
ts = delayed(vcat)((delayed(i -> (sleep(0.5); myid()))(i) for i in 1:20)...);

ctx = Context();
# Scheduler is blocking, so we need a new task to add workers while it runs
job = @async collect(ctx, ts);

# Lets fire up some new workers
ps2 = addprocs(2, exeflags="--project");
@everywhere ps2 using Distributed, Dagger
# New workers are not available until we do this
addprocs!(ctx, ps2)

# Lets hope the job didn't complete before workers were added :)
fetch(job) |> unique

# and cleanup after ourselves...
workers() |> rmprocs
```
5 changes: 4 additions & 1 deletion src/fault-handler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ function handle_fault(ctx, state, thunk, oldproc, chan, node_order)
end

# Reschedule inputs from deadlist
newproc = OSProc(rand(workers()))
ps = procs(ctx)
@assert !isempty(ps) "No processes left!"
newproc = rand(ps)

while length(deadlist) > 0
dt = popfirst!(deadlist)
if any((input in deadlist) for input in dt.inputs)
Expand Down
43 changes: 41 additions & 2 deletions src/processor.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export OSProc, Context
export OSProc, Context, addprocs!, rmprocs!

"""
Processor
Expand Down Expand Up @@ -238,6 +238,7 @@ mutable struct Context
log_sink::Any
profile::Bool
options
proc_lock::ReentrantLock
end

"""
Expand All @@ -256,12 +257,15 @@ as a `Vector{Int}`.
function Context(xs)
Context(xs, NoOpLog(), false, nothing) # By default don't log events
end
Context(xs, log_sink, profile, options) = Context(xs, log_sink, profile, options, ReentrantLock())
Context(xs::Vector{Int}) = Context(map(OSProc, xs))
function Context()
procs = [OSProc(w) for w in workers()]
Context(procs)
end
procs(ctx::Context) = ctx.procs
procs(ctx::Context) = lock(ctx) do
copy(ctx.procs)
end

"""
write_event(ctx::Context, event::Event)
Expand All @@ -271,3 +275,38 @@ Write a log event
function write_event(ctx::Context, event::Event)
write_event(ctx.log_sink, event)
end

"""
lock(f, ctx::Context)
Acquire `ctx.proc_lock`, execute `f` with the lock held, and release the lock when `f` returns.
"""
Base.lock(f, ctx::Context) = lock(f, ctx.proc_lock)

"""
addprocs!(ctx::Context, xs)
Add new workers `xs` to `ctx`.
Workers will typically be assigned new tasks in the next scheduling iteration if scheduling is ongoing.
Workers can be either `Processor`s or the underlying process ids as `Integer`s.
"""
addprocs!(ctx::Context, xs::AbstractVector{<:Integer}) = addprocs!(ctx, map(OSProc, xs))
addprocs!(ctx::Context, xs::AbstractVector{<:Processor}) = lock(ctx) do
append!(ctx.procs, xs)
end
"""
rmprocs!(ctx::Context, xs)
Remove the specified workers `xs` from `ctx`.
Workers will typically finish all their assigned tasks if scheduling is ongoing but will not be assigned new tasks after removal.
Workers can be either `Processors` or the underlying process ids as `Integer`s.
"""
rmprocs!(ctx::Context, xs::AbstractVector{<:Integer}) = rmprocs!(ctx, map(OSProc, xs))
rmprocs!(ctx::Context, xs::AbstractVector{<:Processor}) = lock(ctx) do
filter!(p -> p xs, ctx.procs)
end

83 changes: 64 additions & 19 deletions src/scheduler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module Sch
using Distributed
import MemPool: DRef

import ..Dagger: Context, Processor, Thunk, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, choose_processor, execute!
import ..Dagger: Context, Processor, Thunk, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, choose_processor, execute!, rmprocs!, addprocs!

include("fault-handler.jl")

Expand Down Expand Up @@ -82,33 +82,22 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
master = OSProc(myid())
@dbg timespan_start(ctx, :scheduler_init, 0, master)

if options.single !== 0
@assert options.single in vcat(1, workers()) "Sch option 'single' must specify an active worker id"
ps = OSProc[OSProc(options.single)]
else
ps = procs(ctx)
end
chan = Channel{Any}(32)
deps = dependents(d)
ord = order(d, noffspring(deps))

node_order = x -> -get(ord, x, 0)
state = start_state(deps, node_order)
# start off some tasks
for p in ps
isempty(state.ready) && break
task = pop_with_affinity!(ctx, state.ready, p, false)
if task !== nothing
fire_task!(ctx, task, p, state, chan, node_order)
end
end
# Note: worker_state may be different things for different contexts. Don't touch it out here!
procs_state = assign_new_procs!(ctx, state, chan, node_order)
@dbg timespan_end(ctx, :scheduler_init, 0, master)

# Loop while we still have thunks to execute
while !isempty(state.ready) || !isempty(state.running)
if isempty(state.running) && !isempty(state.ready)
# Nothing running, so schedule up to N thunks, 1 per N workers
for p in ps
for p in procs_to_use(ctx)
isempty(state.ready) && break
task = pop_with_affinity!(ctx, state.ready, p, false)
if task !== nothing
Expand All @@ -117,19 +106,39 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
end
end

# This is a bit redundant as the @async task below does basically the same job
# Without it though, testing of process modification becomes non-deterministic
# (due to sleep in CI environment) which is why it is still here.
procs_state = assign_new_procs!(ctx, state, chan, node_order, procs_state)

if isempty(state.running)
# the block above fired only meta tasks
continue
end

check_integrity(ctx)
# Check periodically for new workers in a parallel task so that we don't accidentally end up
# having to wait for 'take!(chan)' on some large task before new workers are put to work
# Lock is used to stop this task as soon as something pops out from the channel to minimize
# risk that the task schedules thunks simultaneously as the main task (after future refactoring).
newtasks_lock = ReentrantLock()
@async while !isempty(state.ready) || !isempty(state.running)
sleep(1)
islocked(newtasks_lock) && return
procs_state = lock(newtasks_lock) do
assign_new_procs!(ctx, state, chan, node_order, procs_state)
end
end

proc, thunk_id, res = take!(chan) # get result of completed thunk
lock(newtasks_lock) # This waits for any assign_new_procs! above to complete and then shuts down the task

if isa(res, CapturedException) || isa(res, RemoteException)
if check_exited_exception(res)
@warn "Worker $(proc.pid) died on thunk $thunk_id, rescheduling work"

# Remove dead worker from procs list
filter!(p->p.pid!=proc.pid, ctx.procs)
ps = procs(ctx)
remove_dead_proc!(ctx, proc)

handle_fault(ctx, state, state.thunk_dict[thunk_id], proc, chan, node_order)
continue
Expand All @@ -143,8 +152,8 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())

@dbg timespan_start(ctx, :scheduler, thunk_id, master)
immediate_next = finish_task!(state, node, node_order)
if !isempty(state.ready)
thunk = pop_with_affinity!(Context(ps), state.ready, proc, immediate_next)
if !isempty(state.ready) && !shall_remove_proc(ctx, proc, immediate_next)
thunk = pop_with_affinity!(Context(procs_to_use(ctx)), state.ready, proc, immediate_next)
if thunk !== nothing
fire_task!(ctx, thunk, proc, state, chan, node_order)
end
Expand All @@ -154,6 +163,42 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
state.cache[d]
end

procs_to_use(ctx) = procs_to_use(ctx, ctx.options)
function procs_to_use(ctx, options)
return if options.single !== 0
@assert options.single in vcat(1, workers()) "Sch option 'single' must specify an active worker id"
OSProc[OSProc(options.single)]
else
procs(ctx)
end
end

check_integrity(ctx) = check_integrity(ctx, ctx.options)
check_integrity(ctx, ::Any) = @assert !isempty(procs_to_use(ctx)) "No workers available!!"

# Main responsibility of this function is to check if new procs have been pushed to the context
function assign_new_procs!(ctx, state, chan, node_order, assignedprocs=[])
ps = procs_to_use(ctx)
# Must track individual procs to handle the case when procs are removed
for p in setdiff(ps, assignedprocs)
isempty(state.ready) && break
task = pop_with_affinity!(ctx, state.ready, p, false)
if task !== nothing
fire_task!(ctx, task, p, state, chan, node_order)
end
end
return ps
end

# Might be a good policy to not remove the proc if immediate_next
shall_remove_proc(ctx, proc, immediate_next) = proc procs_to_use(ctx)

remove_dead_proc!(ctx, proc) = remove_dead_proc!(ctx, ctx.options, proc)
function remove_dead_proc!(ctx, options, proc)
@assert options.single !== proc.pid "Single worker failed!"
rmprocs!(ctx, [proc])
end

function pop_with_affinity!(ctx, tasks, proc, immediate_next)
# allow JIT specialization on Pairs
mapfirst(c) = first.(c)
Expand Down
27 changes: 20 additions & 7 deletions test/fault-tolerance.jl
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
@testset "Fault tolerance" begin
function setup_funcs()
function setup_funcs(nofail)
@everywhere begin
$(Expr(:using, Expr(Symbol("."), :Dagger)))
function kill_eager(x)
_x = x+1
sleep(1)

_x == 2 && myid() != 1 && exit(1)
_x == 2 && myid() != $nofail && exit(1)

return _x
end
Expand All @@ -17,15 +17,18 @@
_x = sum(x)
sleep(1)

_x == 6 && myid() != 1 && exit(1)
_x == 6 && myid() != $nofail && exit(1)

return _x
end
end
end

setup_funcs()
## 2 workers will fail and exit while one (the last one) will complete the tasks
setup_funcs(workers() |> last)
for kill_func in (kill_eager, kill_lazy)
@test workers() |> length == 3

a = delayed(kill_func)(1)
b = delayed(kill_func)(a)
c = delayed(kill_func)(a)
Expand All @@ -34,7 +37,7 @@

addprocs(2)
using Dagger
setup_funcs()
setup_funcs(workers() |> last)

a = delayed(kill_func)(1)
b = delayed(kill_func)(delayed(kill_func)(a))
Expand All @@ -43,7 +46,7 @@

addprocs(2)
using Dagger
setup_funcs()
setup_funcs(workers() |> last)

a1 = delayed(kill_func)(1)
a2 = delayed(kill_func)(1)
Expand All @@ -55,6 +58,16 @@

addprocs(2)
using Dagger
setup_funcs()
setup_funcs(workers() |> last)

a = delayed(kill_func)(1)
b = delayed(kill_func)(a)
c = delayed(kill_func)(a)
d = delayed(kill_func)(b, c)
@test_throws AssertionError collect(d; options=Dagger.Sch.SchedulerOptions(single=first(workers())))

addprocs(1)
using Dagger
setup_funcs(workers() |> last)
end
end
15 changes: 15 additions & 0 deletions test/processors.jl
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,19 @@ end
@everywhere pop!(Dagger.PROCESSOR_CALLBACKS)
end
end

@testset "Modify workers in Context" begin
ps = addprocs(4, exeflags="--project")
@everywhere ps using Dagger

ctx = Context(ps[1:2])

Dagger.addprocs!(ctx, ps[3:end])
@test map(p -> p.pid, procs(ctx)) == ps

Dagger.rmprocs!(ctx, ps[3:end])
@test map(p -> p.pid, procs(ctx)) == ps[1:2]

wait(rmprocs(ps))
end
end
2 changes: 1 addition & 1 deletion test/runtests.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using Distributed
addprocs(2)
addprocs(3)

using Test
using Dagger
Expand Down
Loading

0 comments on commit d2ebf3b

Please sign in to comment.