From 0da0f53854b37532339a8caf7eedcec2991069da Mon Sep 17 00:00:00 2001 From: Kenta Sato Date: Sun, 4 Jun 2017 02:18:15 +0900 Subject: [PATCH 1/5] remove unused computation --- src/compute.jl | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/compute.jl b/src/compute.jl index c9b80cc9..d6ac31b1 100644 --- a/src/compute.jl +++ b/src/compute.jl @@ -200,10 +200,6 @@ function compute(ctx, d::Thunk) ndeps = noffspring(deps) ord = order(d, ndeps) - sort_ord = collect(ord) - sortord = x -> istask(x[1]) ? x[1].id : 0 - sort_ord = sort(sort_ord, by=sortord) - node_order = x -> -get(ord, x, 0) state = start_state(deps, node_order) # start off some tasks From 789ac7ba3e6c5a0009b5e28b9c6977e2229ba788 Mon Sep 17 00:00:00 2001 From: Kenta Sato Date: Sun, 4 Jun 2017 02:24:36 +0900 Subject: [PATCH 2/5] do not assign a value to ndeps --- src/compute.jl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/compute.jl b/src/compute.jl index d6ac31b1..c06ca63d 100644 --- a/src/compute.jl +++ b/src/compute.jl @@ -197,8 +197,7 @@ function compute(ctx, d::Thunk) ps = procs(ctx) chan = Channel{Any}(32) deps = dependents(d) - ndeps = noffspring(deps) - ord = order(d, ndeps) + ord = order(d, noffspring(deps)) node_order = x -> -get(ord, x, 0) state = start_state(deps, node_order) From 7a7b6322eaf7e06297018b40623bfd025b855de8 Mon Sep 17 00:00:00 2001 From: Kenta Sato Date: Sun, 4 Jun 2017 02:31:03 +0900 Subject: [PATCH 3/5] remove some inconsistent blank lines --- src/compute.jl | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/src/compute.jl b/src/compute.jl index c06ca63d..fc032b7d 100644 --- a/src/compute.jl +++ b/src/compute.jl @@ -213,21 +213,16 @@ function compute(ctx, d::Thunk) while !isempty(state[:waiting]) || !isempty(state[:ready]) || !isempty(state[:running]) proc, thunk_id, res = take!(chan) - if isa(res, CapturedException) || isa(res, RemoteException) rethrow(res) end node = _thunk_dict[thunk_id] @logmsg("W$(proc.pid) - $node ($(node.f)) input:$(node.inputs)") state[:cache][node] = res - #@show state[:cache] - #@show ord - # if any of this guy's dependents are waiting, - # update them - @dbg timespan_start(ctx, :scheduler, thunk_id, master) + # if any of this guy's dependents are waiting, update them + @dbg timespan_start(ctx, :scheduler, thunk_id, master) immediate_next = finish_task!(state, node, node_order) - if !isempty(state[:ready]) if immediate_next # fast path @@ -243,7 +238,6 @@ function compute(ctx, d::Thunk) end @dbg timespan_end(ctx, :scheduler, thunk_id, master) end - state[:cache][d] end @@ -337,7 +331,6 @@ function fire_task!(ctx, thunk, proc, state, chan, node_order) data = map(thunk.inputs) do x istask(x) ? state[:cache][x] : x end - async_apply(ctx, proc, thunk.id, thunk.f, data, chan, thunk.get_result, thunk.persist) end @@ -359,8 +352,6 @@ function dependents(node::Thunk, deps=Dict()) deps end - - """ recursively find the number of taks dependent on each task in the DAG. Input: dependents dict @@ -378,7 +369,6 @@ function noffspring(n, dpents) end end - """ Given a root node of the DAG, calculates a total order for tie-braking @@ -396,7 +386,6 @@ function order(node::Thunk, ndeps) end function order(nodes::AbstractArray, ndeps, c, output=Dict()) - for node in nodes c+=1 output[node] = c From e351b05db2978663dd7c0dff4253889761b90880 Mon Sep 17 00:00:00 2001 From: Kenta Sato Date: Sun, 4 Jun 2017 02:55:31 +0900 Subject: [PATCH 4/5] reorder code to make it easier to follow --- src/compute.jl | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/compute.jl b/src/compute.jl index fc032b7d..b4d07bf6 100644 --- a/src/compute.jl +++ b/src/compute.jl @@ -148,26 +148,25 @@ function thunkize(ctx, c::Cat) end thunkize(ctx, x::AbstractChunk) = x thunkize(ctx, x::Thunk) = x + function finish_task!(state, node, node_order; free=true) - deps = sort([i for i in state[:dependents][node]], by=node_order) - immediate_next = false if istask(node) && node.cache node.cache_ref = Nullable{Any}(state[:cache][node]) end - for dep in deps + immediate_next = false + for dep in sort!(collect(state[:dependents][node]), by=node_order) set = state[:waiting][dep] pop!(set, node) if isempty(set) pop!(state[:waiting], dep) - immediate_next = true push!(state[:ready], dep) + immediate_next = true end # todo: free data end for inp in inputs(node) if inp in keys(state[:waiting_data]) s = state[:waiting_data][inp] - #@show s if node in s pop!(s, node) end From 397bb58a4efb97c449c984f763aa3d9f60ddb6b4 Mon Sep 17 00:00:00 2001 From: Kenta Sato Date: Sun, 4 Jun 2017 03:37:37 +0900 Subject: [PATCH 5/5] recursive definition of node ordering function --- src/compute.jl | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/src/compute.jl b/src/compute.jl index b4d07bf6..8a58f976 100644 --- a/src/compute.jl +++ b/src/compute.jl @@ -381,17 +381,16 @@ Args: - ndeps: result of `noffspring` """ function order(node::Thunk, ndeps) - order([node], ndeps, 0)[2] -end - -function order(nodes::AbstractArray, ndeps, c, output=Dict()) - for node in nodes - c+=1 - output[node] = c - nxt = sort(Any[n for n in inputs(node)], by=k->get(ndeps,k,0)) - c, output = order(nxt, ndeps, c, output) + function recur(nodes, s) + for n in nodes + output[n] = s += 1 + s = recur(sort!(collect(Any, inputs(n)), by=k->get(ndeps,k,0)), s) + end + return s end - c, output + output = Dict{Any,Int}() + recur([node], 0) + return output end function start_state(deps::Dict, node_order)