Skip to content

Commit

Permalink
Merge pull request #61 from bicycle1885/refactor-compute
Browse files Browse the repository at this point in the history
Refactor compute.jl
  • Loading branch information
shashi committed Jun 4, 2017
2 parents dce6f2c + 397bb58 commit e3251bc
Showing 1 changed file with 16 additions and 34 deletions.
50 changes: 16 additions & 34 deletions src/compute.jl
Original file line number Diff line number Diff line change
Expand Up @@ -148,26 +148,25 @@ function thunkize(ctx, c::Cat)
end
thunkize(ctx, x::AbstractChunk) = x
thunkize(ctx, x::Thunk) = x

function finish_task!(state, node, node_order; free=true)
deps = sort([i for i in state[:dependents][node]], by=node_order)
immediate_next = false
if istask(node) && node.cache
node.cache_ref = Nullable{Any}(state[:cache][node])
end
for dep in deps
immediate_next = false
for dep in sort!(collect(state[:dependents][node]), by=node_order)
set = state[:waiting][dep]
pop!(set, node)
if isempty(set)
pop!(state[:waiting], dep)
immediate_next = true
push!(state[:ready], dep)
immediate_next = true
end
# todo: free data
end
for inp in inputs(node)
if inp in keys(state[:waiting_data])
s = state[:waiting_data][inp]
#@show s
if node in s
pop!(s, node)
end
Expand Down Expand Up @@ -197,12 +196,7 @@ function compute(ctx, d::Thunk)
ps = procs(ctx)
chan = Channel{Any}(32)
deps = dependents(d)
ndeps = noffspring(deps)
ord = order(d, ndeps)

sort_ord = collect(ord)
sortord = x -> istask(x[1]) ? x[1].id : 0
sort_ord = sort(sort_ord, by=sortord)
ord = order(d, noffspring(deps))

node_order = x -> -get(ord, x, 0)
state = start_state(deps, node_order)
Expand All @@ -218,21 +212,16 @@ function compute(ctx, d::Thunk)

while !isempty(state[:waiting]) || !isempty(state[:ready]) || !isempty(state[:running])
proc, thunk_id, res = take!(chan)

if isa(res, CapturedException) || isa(res, RemoteException)
rethrow(res)
end
node = _thunk_dict[thunk_id]
@logmsg("W$(proc.pid) - $node ($(node.f)) input:$(node.inputs)")
state[:cache][node] = res
#@show state[:cache]
#@show ord
# if any of this guy's dependents are waiting,
# update them
@dbg timespan_start(ctx, :scheduler, thunk_id, master)

# if any of this guy's dependents are waiting, update them
@dbg timespan_start(ctx, :scheduler, thunk_id, master)
immediate_next = finish_task!(state, node, node_order)

if !isempty(state[:ready])
if immediate_next
# fast path
Expand All @@ -248,7 +237,6 @@ function compute(ctx, d::Thunk)
end
@dbg timespan_end(ctx, :scheduler, thunk_id, master)
end

state[:cache][d]
end

Expand Down Expand Up @@ -342,7 +330,6 @@ function fire_task!(ctx, thunk, proc, state, chan, node_order)
data = map(thunk.inputs) do x
istask(x) ? state[:cache][x] : x
end

async_apply(ctx, proc, thunk.id, thunk.f, data, chan, thunk.get_result, thunk.persist)
end

Expand All @@ -364,8 +351,6 @@ function dependents(node::Thunk, deps=Dict())
deps
end



"""
recursively find the number of taks dependent on each task in the DAG.
Input: dependents dict
Expand All @@ -383,7 +368,6 @@ function noffspring(n, dpents)
end
end


"""
Given a root node of the DAG, calculates a total order for tie-braking
Expand All @@ -397,18 +381,16 @@ Args:
- ndeps: result of `noffspring`
"""
function order(node::Thunk, ndeps)
order([node], ndeps, 0)[2]
end

function order(nodes::AbstractArray, ndeps, c, output=Dict())

for node in nodes
c+=1
output[node] = c
nxt = sort(Any[n for n in inputs(node)], by=k->get(ndeps,k,0))
c, output = order(nxt, ndeps, c, output)
function recur(nodes, s)
for n in nodes
output[n] = s += 1
s = recur(sort!(collect(Any, inputs(n)), by=k->get(ndeps,k,0)), s)
end
return s
end
c, output
output = Dict{Any,Int}()
recur([node], 0)
return output
end

function start_state(deps::Dict, node_order)
Expand Down

0 comments on commit e3251bc

Please sign in to comment.