Skip to content

Commit

Permalink
Fault handler only uses procs(ctx)
Browse files Browse the repository at this point in the history
Worker death with options.single throws
  • Loading branch information
DrChainsaw committed Oct 4, 2020
1 parent 6a24c4a commit 70de5ee
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 14 deletions.
8 changes: 4 additions & 4 deletions src/fault-handler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ function handle_fault(ctx, state, thunk, oldproc, chan, node_order)
end

# Reschedule inputs from deadlist
newproc = OSProc(rand(workers()))
if newproc procs(ctx)
addprocs!(ctx, [newproc])
end
ps = procs(ctx)
@assert !isempty(ps) "No processes left!"
newproc = rand(ps)

while length(deadlist) > 0
dt = popfirst!(deadlist)
if any((input in deadlist) for input in dt.inputs)
Expand Down
9 changes: 7 additions & 2 deletions src/scheduler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
@warn "Worker $(proc.pid) died on thunk $thunk_id, rescheduling work"

# Remove dead worker from procs list
# Not sure what is desired behaviour if option.singleworker is set...
rmprocs!(ctx, [proc])
remove_dead_proc!(ctx, proc)

handle_fault(ctx, state, state.thunk_dict[thunk_id], proc, chan, node_order)
continue
Expand Down Expand Up @@ -188,6 +187,12 @@ end
# Might be a good policy to not remove the proc if immediate_next
shall_remove_proc(ctx, proc, immediate_next) = proc procs_to_use(ctx)

remove_dead_proc!(ctx, proc) = remove_dead_proc!(ctx, ctx.options, proc)
function remove_dead_proc!(ctx, options, proc)
@assert options.single !== proc.pid "Single worker failed!"
rmprocs!(ctx, [proc])
end

function pop_with_affinity!(ctx, tasks, proc, immediate_next)
# allow JIT specialization on Pairs
mapfirst(c) = first.(c)
Expand Down
27 changes: 20 additions & 7 deletions test/fault-tolerance.jl
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
@testset "Fault tolerance" begin
function setup_funcs()
function setup_funcs(nofail)
@everywhere begin
$(Expr(:using, Expr(Symbol("."), :Dagger)))
function kill_eager(x)
_x = x+1
sleep(1)

_x == 2 && myid() != 1 && exit(1)
_x == 2 && myid() != $nofail && exit(1)

return _x
end
Expand All @@ -17,15 +17,18 @@
_x = sum(x)
sleep(1)

_x == 6 && myid() != 1 && exit(1)
_x == 6 && myid() != $nofail && exit(1)

return _x
end
end
end

setup_funcs()
## 2 workers will fail and exit while one (the last one) will complete the tasks
setup_funcs(workers() |> last)
for kill_func in (kill_eager, kill_lazy)
@test workers() |> length == 3

a = delayed(kill_func)(1)
b = delayed(kill_func)(a)
c = delayed(kill_func)(a)
Expand All @@ -34,7 +37,7 @@

addprocs(2)
using Dagger
setup_funcs()
setup_funcs(workers() |> last)

a = delayed(kill_func)(1)
b = delayed(kill_func)(delayed(kill_func)(a))
Expand All @@ -43,7 +46,7 @@

addprocs(2)
using Dagger
setup_funcs()
setup_funcs(workers() |> last)

a1 = delayed(kill_func)(1)
a2 = delayed(kill_func)(1)
Expand All @@ -55,6 +58,16 @@

addprocs(2)
using Dagger
setup_funcs()
setup_funcs(workers() |> last)

a = delayed(kill_func)(1)
b = delayed(kill_func)(a)
c = delayed(kill_func)(a)
d = delayed(kill_func)(b, c)
@test_throws AssertionError collect(d; options=Dagger.Sch.SchedulerOptions(single=first(workers())))

addprocs(1)
using Dagger
setup_funcs(workers() |> last)
end
end
2 changes: 1 addition & 1 deletion test/runtests.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using Distributed
addprocs(2)
addprocs(3)

using Test
using Dagger
Expand Down

0 comments on commit 70de5ee

Please sign in to comment.