From 2aa4cc6435dcef11b2f60f6f5f96ca3717b1e307 Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Thu, 29 Oct 2020 11:11:16 -0500 Subject: [PATCH] Scheduler log_sink cleanup, doc proctypes --- src/scheduler.jl | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/scheduler.jl b/src/scheduler.jl index d47cc033..3c005a8a 100644 --- a/src/scheduler.jl +++ b/src/scheduler.jl @@ -42,6 +42,9 @@ Stores DAG-global options to be passed to the Dagger.Sch scheduler. # Arguments - `single::Int=0`: Force all work onto worker with specified id. `0` disables this option. +- `proctypes::Vector{Type{<:Processor}}=Type[]`: Force scheduler to use one or +more processors that are instances/subtypes of a contained type. Leave this +vector empty to disable. """ Base.@kwdef struct SchedulerOptions single::Int = 0 @@ -375,8 +378,8 @@ function start_state(deps::Dict, node_order) state end -@noinline function do_task(thunk_id, f, data, send_result, persist, cache, options, ids, logsink) - ctx = Context(Processor[], logsink, false, nothing) +@noinline function do_task(thunk_id, f, data, send_result, persist, cache, options, ids, log_sink) + ctx = Context(Processor[]; log_sink=log_sink) proc = OSProc() fetched = map(Iterators.zip(data,ids)) do (x, id) @dbg timespan_start(ctx, :comm, (thunk_id, id), (f, id)) @@ -407,10 +410,10 @@ end result_meta end -@noinline function async_apply(p::OSProc, thunk_id, f, data, chan, send_res, persist, cache, options, ids, logsink) +@noinline function async_apply(p::OSProc, thunk_id, f, data, chan, send_res, persist, cache, options, ids, log_sink) @async begin try - put!(chan, remotecall_fetch(do_task, p.pid, thunk_id, f, data, send_res, persist, cache, options, ids, logsink)) + put!(chan, remotecall_fetch(do_task, p.pid, thunk_id, f, data, send_res, persist, cache, options, ids, log_sink)) catch ex bt = catch_backtrace() put!(chan, (p, thunk_id, CapturedException(ex, bt)))