From ebefe0a3ef4578326dc6e6aa11deefe49b41a755 Mon Sep 17 00:00:00 2001 From: DrChainsaw Date: Thu, 1 Oct 2020 23:39:05 +0200 Subject: [PATCH] Add fault handler new proc to context --- src/fault-handler.jl | 3 +++ src/scheduler.jl | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/fault-handler.jl b/src/fault-handler.jl index f0996111..eb688dbf 100644 --- a/src/fault-handler.jl +++ b/src/fault-handler.jl @@ -124,6 +124,9 @@ function handle_fault(ctx, state, thunk, oldproc, chan, node_order) # Reschedule inputs from deadlist newproc = OSProc(rand(workers())) + if newproc ∉ procs(ctx) + addprocs!(ctx, [newproc]) + end while length(deadlist) > 0 dt = popfirst!(deadlist) if any((input in deadlist) for input in dt.inputs) diff --git a/src/scheduler.jl b/src/scheduler.jl index f28f7039..cf7eb04a 100644 --- a/src/scheduler.jl +++ b/src/scheduler.jl @@ -3,7 +3,7 @@ module Sch using Distributed import MemPool: DRef -import ..Dagger: Context, Processor, Thunk, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, choose_processor, execute!, rmprocs! +import ..Dagger: Context, Processor, Thunk, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, choose_processor, execute!, rmprocs!, addprocs! include("fault-handler.jl") @@ -112,7 +112,7 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) end # Note: worker_state may be different things for different contexts. Don't touch it out here! - worker_state = assign_new_workers!(ctx ,ps, state, chan, node_order, worker_state) + worker_state = assign_new_workers!(ctx, ps, state, chan, node_order, worker_state) if isempty(state.running) # the block above fired only meta tasks