Skip to content

Commit

Permalink
Merge 085b3f1 into b77694b
Browse files Browse the repository at this point in the history
  • Loading branch information
jpsamaroo committed Aug 26, 2020
2 parents b77694b + 085b3f1 commit 470daf2
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 40 deletions.
1 change: 1 addition & 0 deletions docs/make.jl
Expand Up @@ -15,6 +15,7 @@ makedocs(;
"Home" => "index.md",
"Processors" => "processors.md",
"Scheduler Internals" => "scheduler-internals.md",
"Logging and Graphing" => "logging.md",
]
)

Expand Down
41 changes: 41 additions & 0 deletions docs/src/logging.md
@@ -0,0 +1,41 @@
# Logging and Graphing

Dagger's scheduler keeps track of the important and potentially expensive
actions it does, such as moving data between workers or executing thunks, and
tracks how much time and memory allocations these operations consume. Saving
this information somewhere accessible is disabled by default, but it's quite
easy to turn it on:

```julia
ctx = Context()
log = Dagger.LocalEventLog()
ctx.log_sink = log
```

Now anytime `ctx` is used as the context for a scheduler, the scheduler will
log events into `log`. A `LocalEventLog` logs information in-memory, and does
so on each worker. The default log object is a `NoOpLog`, which doesn't store
events at all. The `FilterLog` exists to allow writing events to a
user-defined location (such as a database, file, or network socket).

Once sufficient data has been accumulated into a `LocalEventLog`, it can be
gathered to a single host via `Dagger.get_logs!(log)`. The result is a
`Vector` of `Dagger.Timespan` objects, which describe some metadata about an
operation that occured and the scheduler logged. These events may be
introspected directly, or may also be rendered to a DOT-format string:

```julia
logs = Dagger.get_logs!(log)
str = Dagger.show_plan(logs)
```

`Dagger.show_plan` can also be called as `Dagger.show_plan(io::IO, logs)` to
write the graph to a file or other `IO` object. The string generated by this
function may be passed to an external tool like `Graphviz` for rendering. Note
that this method doesn't display input arguments to the DAG (non-`Thunk`s);
you can call `Dagger.show_plan(logs, thunk)`, where `thunk` is the output
`Thunk` of the DAG, to render argument nodes.

!!! note
`Dagger.get_logs!` clears out the event logs, so that old events don't mix
with new ones from future DAGs.
6 changes: 3 additions & 3 deletions src/lib/logging.jl
Expand Up @@ -46,11 +46,10 @@ create a timespan given the strt and finish events
function make_timespan(start::Event, finish::Event)
@assert start.category == finish.category
@assert start.id == finish.id
@assert start.timeline == finish.timeline

Timespan(start.category,
start.id,
start.timeline,
finish.timeline,
start.timestamp,
finish.timestamp,
Base.GC_Diff(finish.gc_num,start.gc_num),
Expand Down Expand Up @@ -243,7 +242,8 @@ function get_logs!(::LocalEventLog)
log
end
end
build_timespans(vcat(values(logs)...)).completed
spans = build_timespans(vcat(values(logs)...)).completed
convert(Vector{Timespan}, spans)
end

function add_gc_diff(x,y)
Expand Down
3 changes: 3 additions & 0 deletions src/processor.jl
Expand Up @@ -76,6 +76,9 @@ The default implementation breaks a single `move` call down into a sequence of
`move` calls, and is not intended to be maximally efficient.
"""
function move(ctx, from_proc::Processor, to_proc::Processor, x)
if from_proc == to_proc
return x
end
@debug "Initiating generic move"
# Move to remote OSProc
@debug "(Remote) moving $parent_proc to $grandparent_proc"
Expand Down
49 changes: 31 additions & 18 deletions src/scheduler.jl
Expand Up @@ -222,19 +222,22 @@ function fire_task!(ctx, thunk, proc, state, chan, node_order)
end
end

ids = map(thunk.inputs) do x
istask(x) ? x.id : nothing
end
if thunk.meta
# Run it on the parent node
# do not move data.
# Run it on the parent node, do not move data.
p = OSProc(myid())
@dbg timespan_start(ctx, :comm, thunk.id, p)
fetched = map(thunk.inputs) do x
istask(x) ? state.cache[x] : x
fetched = map(Iterators.zip(thunk.inputs,ids)) do (x, id)
@dbg timespan_start(ctx, :comm, (thunk.id, id), (thunk.f, id))
x = istask(x) ? state.cache[x] : x
@dbg timespan_end(ctx, :comm, (thunk.id, id), (thunk.f, id))
return x
end
@dbg timespan_end(ctx, :comm, thunk.id, p)

@dbg timespan_start(ctx, :compute, thunk.id, p)
@dbg timespan_start(ctx, :compute, thunk.id, thunk.f)
res = thunk.f(fetched...)
@dbg timespan_end(ctx, :compute, thunk.id, p)
@dbg timespan_end(ctx, :compute, thunk.id, (thunk.f, typeof(res), sizeof(res)))

#push!(state.running, thunk)
state.cache[thunk] = res
Expand All @@ -261,7 +264,7 @@ function fire_task!(ctx, thunk, proc, state, chan, node_order)
if options.single > 0
proc = OSProc(options.single)
end
async_apply(ctx, proc, thunk.id, thunk.f, data, chan, thunk.get_result, thunk.persist, thunk.cache, options)
async_apply(ctx, proc, thunk.id, thunk.f, data, chan, thunk.get_result, thunk.persist, thunk.cache, options, ids)
end

function finish_task!(state, node, node_order; free=true)
Expand Down Expand Up @@ -327,30 +330,40 @@ function start_state(deps::Dict, node_order)
state
end

@noinline function do_task(ctx, proc, thunk_id, f, data, send_result, persist, cache, options)
@dbg timespan_start(ctx, :comm, thunk_id, proc)
fetched = map(x->x isa Union{Chunk,Thunk} ? collect(ctx, x) : x, data)
@dbg timespan_end(ctx, :comm, thunk_id, proc)
@noinline function do_task(ctx, proc, thunk_id, f, data, send_result, persist, cache, options, ids)
fetched = map(Iterators.zip(data,ids)) do (x, id)
@dbg timespan_start(ctx, :comm, (thunk_id, id), (f, id))
x = x isa Union{Chunk,Thunk} ? collect(ctx, x) : x
@dbg timespan_end(ctx, :comm, (thunk_id, id), (f, id))
return x
end

@dbg timespan_start(ctx, :compute, thunk_id, proc)
from_proc = proc
# TODO: Time choose_processor?
to_proc = choose_processor(from_proc, options, f, fetched)
fetched = move.(Ref(ctx), Ref(from_proc), Ref(to_proc), fetched)
fetched = map(Iterators.zip(fetched,ids)) do (x, id)
@dbg timespan_start(ctx, :move, (thunk_id, id), (f, id))
x = move(ctx, from_proc, to_proc, x)
@dbg timespan_end(ctx, :move, (thunk_id, id), (f, id))
return x
end
@dbg timespan_start(ctx, :compute, thunk_id, f)
res = nothing
result_meta = try
res = execute!(to_proc, f, fetched...)
(from_proc, thunk_id, send_result ? res : tochunk(res, to_proc; persist=persist, cache=persist ? true : cache)) #todo: add more metadata
catch ex
bt = catch_backtrace()
(from_proc, thunk_id, RemoteException(myid(), CapturedException(ex, bt)))
end
@dbg timespan_end(ctx, :compute, thunk_id, proc)
@dbg timespan_end(ctx, :compute, thunk_id, (f, typeof(res), sizeof(res)))
result_meta
end

@noinline function async_apply(ctx, p::OSProc, thunk_id, f, data, chan, send_res, persist, cache, options)
@noinline function async_apply(ctx, p::OSProc, thunk_id, f, data, chan, send_res, persist, cache, options, ids)
@async begin
try
put!(chan, remotecall_fetch(do_task, p.pid, ctx, p, thunk_id, f, data, send_res, persist, cache, options))
put!(chan, remotecall_fetch(do_task, p.pid, ctx, p, thunk_id, f, data, send_res, persist, cache, options, ids))
catch ex
bt = catch_backtrace()
put!(chan, (p, thunk_id, CapturedException(ex, bt)))
Expand Down
162 changes: 143 additions & 19 deletions src/ui/graph.jl
@@ -1,23 +1,31 @@
import Dagger: Thunk
export show_plan

function node_label(io, t::Thunk, c)
if isa(t.f, Function)
println(io, "$(t.id) [label=\"$(t.f) - $(t.id)\"]")
else
println(io, "$(t.id) [label=\"fn - $(t.id)\"]")
end
### DAG-based graphing

function write_node(io, t::Thunk, c)
f = isa(t.f, Function) ? "$(t.f)" : "fn"
println(io, "n_$(t.id) [label=\"$f - $(t.id)\"];")
c
end

function node_label(io, t,c)
l = replace(string(t), "\"", "")
println(io, dec(hash(t)), " [label=\"$l\"]")
dec(x) = Base.dec(x, 0, false)
function write_node(io, t, c, id=dec(hash(t)))
l = replace(node_label(t), "\""=>"")
println(io, "n_$id", " [label=\"$l\"];")
c
end
function node_label(t)
iob = IOBuffer()
Base.show(iob, t)
String(take!(iob))
end
function node_label(x::T) where T<:AbstractArray
"$T\nShape: $(size(x))\nSize: $(pretty_size(sizeof(x)))"
end

global _part_labels = Dict()

function node_label(io, t::Chunk, c)
function write_node(io, t::Chunk, c)
_part_labels[t]="part_$c"
c+1
end
Expand All @@ -34,12 +42,12 @@ function node_id(t::Chunk)
_part_labels[t]
end

function write_dag(io, t)
function write_dag(io, t::Thunk)
!istask(t) && return
deps = dependents(t)
c=1
for k in keys(deps)
c = node_label(io, k, c)
c = write_node(io, k, c)
end
for (k, v) in deps
for dep in v
Expand All @@ -50,13 +58,129 @@ function write_dag(io, t)
end
end

function show_plan(t::Thunk)
io = IOBuffer()
### Timespan-based graphing

pretty_time(ts::Timespan) = pretty_time(ts.finish-ts.start)
function pretty_time(t)
r(t) = round(t; digits=3)
if t > 1000^3
"$(r(t/(1000^3))) s"
elseif t > 1000^2
"$(r(t/(1000^2))) ms"
elseif t > 1000
"$(r(t/1000)) us"
else
"$(r(t)) ns"
end
end
function pretty_size(sz)
if sz > 1024^4
"$(sz/(1024^4)) TB (terabytes)"
elseif sz > 1024^3
"$(sz/(1024^3)) GB (gigabytes)"
elseif sz > 1024^2
"$(sz/(1024^2)) MB (megabytes)"
elseif sz > 1024
"$(sz/1024) KB (kilobytes)"
else
"$sz B (bytes)"
end
end

function write_node(io, ts::Timespan, c)
f, res_type, res_sz = ts.timeline
f = isa(f, Function) ? "$f" : "fn"
t_comp = pretty_time(ts)
sz_comp = pretty_size(res_sz)
println(io, "n_$(ts.id) [label=\"$f - $(ts.id)\nCompute: $t_comp\nResult Type: $res_type\nResult Size: $sz_comp\"]")
c
end

function write_edge(io, ts_comm::Timespan, logs)
f, id = ts_comm.timeline
# FIXME: We should print these edges too
id === nothing && return
t_comm = pretty_time(ts_comm)
print(io, "n_$id -> n_$(ts_comm.id[1]) [label=\"Comm: $t_comm")
ts_idx = findfirst(x->x.category==:move &&
ts_comm.id==x.id &&
id==x.timeline[2], logs)
if ts_idx !== nothing
ts_move = logs[ts_idx]
t_move = pretty_time(ts_move)
print(io, "\nMove: $t_move")
end
println(io, "\"];")
end

write_edge(io, from::String, to::String) = println(io, "n_$from -> n_$to")

getargs!(d, t) = nothing
function getargs!(d, t::Thunk)
d[t.id] = [filter(!istask, [t.inputs...,])...,]
foreach(i->getargs!(d, i), t.inputs)
end
function write_dag(io, logs::Vector, t=nothing)
argmap = Dict{Int,Vector}()
getargs!(argmap, t)
c = 1
for ts in filter(x->x.category==:compute, logs)
c = write_node(io, ts, c)
end
argnodemap = Dict{Int,Vector{String}}()
argids = IdDict{Any,String}()
for id in keys(argmap)
nodes = String[]
arg_c = 1
for arg in argmap[id]
name = "$(id)_arg_$(arg_c)"
if !isimmutable(arg)
if arg in keys(argids)
name = argids[arg]
else
argids[arg] = name
c = write_node(io, arg, c, name)
end
push!(nodes, name)
else
c = write_node(io, arg, c, name)
push!(nodes, name)
end
end
argnodemap[id] = nodes
end
for ts in filter(x->x.category==:comm, logs)
write_edge(io, ts, logs)
end
for id in keys(argnodemap)
for arg in argnodemap[id]
write_edge(io, arg, string(id))
end
end
end

function show_plan(io::IO, t)
print(io, """digraph {
graph [layout=dot, rankdir=TB];""")
write_dag(io, t)
"""digraph {
graph [layout=dot, rankdir=TB];
$(String(take!(io)))
}"""
println(io, "}")
end
function show_plan(io::IO, logs::Vector{Timespan}, t::Thunk)
print(io, """digraph {
graph [layout=dot, rankdir=TB];""")
write_dag(io, logs, t)
println(io, "}")
end

function show_plan(t::Union{Thunk,Vector{Timespan}})
io = IOBuffer()
show_plan(io, t)
return String(take!(io))
end
function show_plan(logs::Vector{Timespan}, t::Thunk)
io = IOBuffer()
show_plan(io, logs, t)
return String(take!(io))
end

function show_plan(c)
Expand Down
1 change: 1 addition & 0 deletions test/runtests.jl
Expand Up @@ -10,6 +10,7 @@ include("domain.jl")
include("array.jl")
include("scheduler.jl")
include("processors.jl")
include("ui.jl")
include("fault-tolerance.jl")
println(stderr, "tests done. cleaning up...")
Dagger.cleanup()
Expand Down

0 comments on commit 470daf2

Please sign in to comment.