Skip to content

Commit

Permalink
hack to rethrow serialization errors that happen on the workers
Browse files Browse the repository at this point in the history
  • Loading branch information
Shashi Gowda committed Apr 11, 2017
1 parent f80caad commit 30a641f
Showing 1 changed file with 24 additions and 17 deletions.
41 changes: 24 additions & 17 deletions src/compute.jl
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ function compute(ctx, d::Thunk)
master = OSProc(myid())
@dbg timespan_start(ctx, :scheduler_init, 0, master)
ps = procs(ctx)
chan = RemoteChannel()
chan = Channel{Any}(32)
deps = dependents(d)
ndeps = noffspring(deps)
ord = order(d, ndeps)
Expand All @@ -215,7 +215,7 @@ function compute(ctx, d::Thunk)
while !isempty(state[:waiting]) || !isempty(state[:ready]) || !isempty(state[:running])
proc, thunk_id, res = take!(chan)

if isa(res, CapturedException)
if isa(res, CapturedException) || isa(res, RemoteException)
rethrow(res)
end
node = _thunk_dict[thunk_id]
Expand Down Expand Up @@ -381,25 +381,32 @@ end
_move(ctx, to_proc, x) = x
_move(ctx, to_proc::OSProc, x::AbstractChunk) = gather(ctx, x)

function do_task(ctx, proc, thunk_id, f, data, chan, send_result, persist)
@dbg timespan_start(ctx, :comm, thunk_id, proc)
fetched = map(x->_move(ctx, proc, x), data)
@dbg timespan_end(ctx, :comm, thunk_id, proc)
function do_task(ctx, proc, thunk_id, f, data, send_result, persist)
@dbg timespan_start(ctx, :comm, thunk_id, proc)
fetched = map(x->_move(ctx, proc, x), data)
@dbg timespan_end(ctx, :comm, thunk_id, proc)

@dbg timespan_start(ctx, :compute, thunk_id, proc)
result_meta = try
res = f(fetched...)
(proc, thunk_id, send_result ? res : tochunk(res, persist=persist)) #todo: add more metadata
catch ex
bt = catch_backtrace()
(proc, thunk_id, CapturedException(ex, bt))
end
@dbg timespan_end(ctx, :compute, thunk_id, proc)
result_meta
end

@dbg timespan_start(ctx, :compute, thunk_id, proc)
function async_apply(ctx, p::OSProc, thunk_id, f, data, chan, send_res, persist)
@schedule begin
try
res = f(fetched...)
put!(chan, (proc, thunk_id, send_result ? res : tochunk(res, persist=persist))) #todo: add more metadata
put!(chan, Base.remotecall_fetch(do_task, p.pid, ctx, p, thunk_id, f, data, send_res, persist))
catch ex
bt = catch_backtrace()
put!(chan, (proc, thunk_id, CapturedException(ex, bt)))
put!(chan, (p.pid, thunk_id, ex))
end
@dbg timespan_end(ctx, :compute, thunk_id, proc)
nothing
end

function async_apply(ctx, p::OSProc, thunk_id, f, data, chan, send_res, persist)
Base.remote_do(do_task, p.pid, ctx, p, thunk_id, f, data, chan, send_res, persist)
nothing
end
end

function debug_compute(ctx::Context, args...; profile=false)
Expand Down

0 comments on commit 30a641f

Please sign in to comment.