Skip to content

Commit

Permalink
Scheduler log_sink cleanup, doc proctypes
Browse files Browse the repository at this point in the history
  • Loading branch information
jpsamaroo committed Oct 29, 2020
1 parent 9bba6c7 commit 2aa4cc6
Showing 1 changed file with 7 additions and 4 deletions.
11 changes: 7 additions & 4 deletions src/scheduler.jl
Expand Up @@ -42,6 +42,9 @@ Stores DAG-global options to be passed to the Dagger.Sch scheduler.
# Arguments
- `single::Int=0`: Force all work onto worker with specified id. `0` disables this option.
- `proctypes::Vector{Type{<:Processor}}=Type[]`: Force scheduler to use one or
more processors that are instances/subtypes of a contained type. Leave this
vector empty to disable.
"""
Base.@kwdef struct SchedulerOptions
single::Int = 0
Expand Down Expand Up @@ -375,8 +378,8 @@ function start_state(deps::Dict, node_order)
state
end

@noinline function do_task(thunk_id, f, data, send_result, persist, cache, options, ids, logsink)
ctx = Context(Processor[], logsink, false, nothing)
@noinline function do_task(thunk_id, f, data, send_result, persist, cache, options, ids, log_sink)
ctx = Context(Processor[]; log_sink=log_sink)
proc = OSProc()
fetched = map(Iterators.zip(data,ids)) do (x, id)
@dbg timespan_start(ctx, :comm, (thunk_id, id), (f, id))
Expand Down Expand Up @@ -407,10 +410,10 @@ end
result_meta
end

@noinline function async_apply(p::OSProc, thunk_id, f, data, chan, send_res, persist, cache, options, ids, logsink)
@noinline function async_apply(p::OSProc, thunk_id, f, data, chan, send_res, persist, cache, options, ids, log_sink)
@async begin
try
put!(chan, remotecall_fetch(do_task, p.pid, thunk_id, f, data, send_res, persist, cache, options, ids, logsink))
put!(chan, remotecall_fetch(do_task, p.pid, thunk_id, f, data, send_res, persist, cache, options, ids, log_sink))
catch ex
bt = catch_backtrace()
put!(chan, (p, thunk_id, CapturedException(ex, bt)))
Expand Down

0 comments on commit 2aa4cc6

Please sign in to comment.