Skip to content

Commit

Permalink
Merge pull request #141 from JuliaParallel/jps/worker-pressure
Browse files Browse the repository at this point in the history
Added worker pressure monitoring
  • Loading branch information
jpsamaroo committed Nov 5, 2020
2 parents 9e696f7 + fdd68f9 commit c46a395
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 78 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2"
StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91"

[compat]
MemPool = "0.3"
MemPool = "0.3.2"
StatsBase = "0.28, 0.29, 0.30, 0.31, 0.32, 0.33"
julia = "1.0"

Expand Down
3 changes: 2 additions & 1 deletion src/array/alloc.jl
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ Base.ones(p::Blocks, dims::Integer...) = ones(p, Float64, dims)
Base.ones(p::Blocks, dims::Tuple) = ones(p, Float64, dims)

function Base.zeros(p::Blocks, eltype::Type, dims)
AllocateArray(eltype, (_, x...) -> zeros(x...), ArrayDomain(map(x->1:x, dims)), p)
d = ArrayDomain(map(x->1:x, dims))
AllocateArray(eltype, (_, x...) -> zeros(x...), d, partition(p, d))
end
Base.zeros(p::Blocks, t::Type, dims::Integer...) = zeros(p, t, dims)
Base.zeros(p::Blocks, dims::Integer...) = zeros(p, Float64, dims)
Expand Down
17 changes: 0 additions & 17 deletions src/chunks.jl
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,6 @@ function affinity(r::FileRef)
end
end

function Serialization.deserialize(io::AbstractSerializer, dt::Type{Chunk{T,H,P}}) where {T,H,P}
nf = fieldcount(dt)
c = ccall(:jl_new_struct_uninit, Any, (Any,), dt)
Serialization.deserialize_cycle(io, c)
for i in 1:nf
tag = Int32(read(io.io, UInt8)::UInt8)
if tag != Serialization.UNDEFREF_TAG
ccall(:jl_set_nth_field, Cvoid, (Any, Csize_t, Any), c, i-1, Serialization.handle_deserialize(io, tag))
end
end
myid() == 1 && nworkers() > 1 && finalizer(x->@async(free!(x)), c)
c
end

"""
tochunk(x, proc; persist=false, cache=false) -> Chunk
Expand All @@ -132,13 +118,10 @@ function free!(s::Chunk{X, DRef, P}; force=true, cache=false) where {X,P}
catch err
isa(err, KeyError) || rethrow(err)
end
else
pooldelete(s.handle) # remove immediately
end
end
end
free!(x; force=true,cache=false) = x # catch-all for non-chunks
free!(x::DRef) = pooldelete(x)

function savechunk(data, dir, f)
sz = open(joinpath(dir, f), "w") do io
Expand Down
4 changes: 2 additions & 2 deletions src/fault-handler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ of DAGs, it *may* cause a `KeyError` or other failures in the scheduler due to
the complexity of getting the internal state back to a consistent and proper
state.
"""
function handle_fault(ctx, state, thunk, oldproc, chan, node_order)
function handle_fault(ctx, state, thunk, oldproc, chan)
# Find thunks whose results were cached on the dead worker and place them
# on what's called a "deadlist". This structure will direct the recovery
# of the scheduler's state.
Expand Down Expand Up @@ -134,7 +134,7 @@ function handle_fault(ctx, state, thunk, oldproc, chan, node_order)
push!(deadlist, dt)
continue
end
fire_task!(ctx, dt, newproc, state, chan, node_order)
fire_task!(ctx, dt, newproc, state, chan)
break
end
end
8 changes: 8 additions & 0 deletions src/processor.jl
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ function move_to_osproc(parent_proc, x)
return x, parent_proc
end

"""
capacity(proc::Processor=OSProc()) -> Int
Returns the total processing capacity of `proc`.
"""
capacity(proc=OSProc()) = length(get_processors(proc))

"""
OSProc <: Processor
Expand Down Expand Up @@ -273,6 +280,7 @@ Context(procs::Vector{P}=Processor[OSProc(w) for w in workers()];
profile=false, options=nothing) where {P<:Processor} =
Context(procs, proc_lock, log_sink, log_file, profile, options)
Context(xs::Vector{Int}) = Context(map(OSProc, xs))
Context() = Context([OSProc(w) for w in workers()])
procs(ctx::Context) = lock(ctx) do
copy(ctx.procs)
end
Expand Down
147 changes: 95 additions & 52 deletions src/scheduler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module Sch
using Distributed
import MemPool: DRef

import ..Dagger: Context, Processor, Thunk, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, choose_processor, execute!, rmprocs!, addprocs!
import ..Dagger: Context, Processor, Thunk, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, capacity, choose_processor, execute!, rmprocs!, addprocs!

include("fault-handler.jl")

Expand All @@ -23,6 +23,9 @@ Fields
- cache::Dict{Thunk, Any} - Maps from a finished `Thunk` to it's cached result, often a DRef
- running::Set{Thunk} - The set of currently-running `Thunk`s
- thunk_dict::Dict{Int, Any} - Maps from thunk IDs to a `Thunk`
- node_order::Any - Function that returns the order of a thunk
- worker_pressure::Dict{Int,Int} - Cache of worker pressure
- worker_capacity::Dict{Int,Int} - Maps from worker ID to capacity
"""
struct ComputeState
dependents::OneToMany
Expand All @@ -33,6 +36,9 @@ struct ComputeState
cache::Dict{Thunk, Any}
running::Set{Thunk}
thunk_dict::Dict{Int, Any}
node_order::Any
worker_pressure::Dict{Int,Int}
worker_capacity::Dict{Int,Int}
end

"""
Expand Down Expand Up @@ -77,6 +83,9 @@ end
function cleanup(ctx)
end

"Process-local count of actively-executing Dagger tasks."
const ACTIVE_TASKS = Threads.Atomic{Int}(0)

function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
if options === nothing
options = SchedulerOptions()
Expand All @@ -91,29 +100,30 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())

node_order = x -> -get(ord, x, 0)
state = start_state(deps, node_order)

# Initialize pressure and capacity
@sync for p in procs_to_use(ctx)
state.worker_pressure[p.pid] = 0
@async state.worker_capacity[p.pid] = remotecall_fetch(capacity, p.pid)
end

# start off some tasks
# Note: worker_state may be different things for different contexts. Don't touch it out here!
procs_state = assign_new_procs!(ctx, state, chan, node_order)
# Note: procs_state may be different things for different contexts. Don't touch it out here!
procs_state = assign_new_procs!(ctx, state, chan, procs_to_use(ctx))
@dbg timespan_end(ctx, :scheduler_init, 0, master)

# Loop while we still have thunks to execute
while !isempty(state.ready) || !isempty(state.running)
if isempty(state.running) && !isempty(state.ready)
# Nothing running, so schedule up to N thunks, 1 per N workers
for p in procs_to_use(ctx)
isempty(state.ready) && break
task = pop_with_affinity!(ctx, state.ready, p, false)
if task !== nothing
fire_task!(ctx, task, p, state, chan, node_order)
end
end
schedule!(ctx, state, chan, procs_state)
end

# This is a bit redundant as the @async task below does basically the
# same job Without it though, testing of process modification becomes
# non-deterministic (due to sleep in CI environment) which is why it is
# still here.
procs_state = assign_new_procs!(ctx, state, chan, node_order, procs_state)
procs_state = assign_new_procs!(ctx, state, chan, procs_state)

if isempty(state.running)
# the block above fired only meta tasks
Expand All @@ -132,37 +142,34 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
sleep(1)
islocked(newtasks_lock) && return
procs_state = lock(newtasks_lock) do
assign_new_procs!(ctx, state, chan, node_order, procs_state)
assign_new_procs!(ctx, state, chan, procs_state)
end
end

proc, thunk_id, res = take!(chan) # get result of completed thunk
proc, thunk_id, res, metadata = take!(chan) # get result of completed thunk
lock(newtasks_lock) # This waits for any assign_new_procs! above to complete and then shuts down the task

if isa(res, CapturedException) || isa(res, RemoteException)
if check_exited_exception(res)
@warn "Worker $(proc.pid) died on thunk $thunk_id, rescheduling work"

# Remove dead worker from procs list
remove_dead_proc!(ctx, proc)

handle_fault(ctx, state, state.thunk_dict[thunk_id], proc, chan, node_order)
handle_fault(ctx, state, state.thunk_dict[thunk_id], proc, chan)
continue
else
throw(res)
end
end
state.worker_pressure[proc.pid] = metadata.pressure
node = state.thunk_dict[thunk_id]
@logmsg("WORKER $(proc.pid) - $node ($(node.f)) input:$(node.inputs)")
state.cache[node] = res

@dbg timespan_start(ctx, :scheduler, thunk_id, master)
immediate_next = finish_task!(state, node, node_order)
immediate_next = finish_task!(state, node)
if !isempty(state.ready) && !shall_remove_proc(ctx, proc)
thunk = pop_with_affinity!(Context(procs_to_use(ctx)), state.ready, proc, immediate_next)
if thunk !== nothing
fire_task!(ctx, thunk, proc, state, chan, node_order)
end
pop_and_fire!(ctx, state, chan, proc; immediate_next=immediate_next)
end
@dbg timespan_end(ctx, :scheduler, thunk_id, master)
end
Expand All @@ -180,16 +187,56 @@ end

check_integrity(ctx) = @assert !isempty(procs_to_use(ctx)) "No suitable workers available in context."

function schedule!(ctx, state, chan, procs=procs_to_use(ctx))
@assert length(procs) > 0
proc_keys = map(x->x.pid, procs)
proc_ratios = Dict(p=>(state.worker_pressure[p]/state.worker_capacity[p]) for p in proc_keys)
proc_ratios_sorted = sort(proc_keys, lt=(a,b)->proc_ratios[a]<proc_ratios[b])
progress = false
id = popfirst!(proc_ratios_sorted)
was_empty = isempty(state.ready)
while !isempty(state.ready)
if (state.worker_pressure[id] >= state.worker_capacity[id])
# TODO: provide forward progress guarantees at user's request
if isempty(proc_ratios_sorted)
break
end
id = popfirst!(proc_ratios_sorted)
end
if !pop_and_fire!(ctx, state, chan, OSProc(id))
# Internal scheduler gave up, so we force scheduling
task = pop!(state.ready)
fire_task!(ctx, task, OSProc(id), state, chan)
end
progress = true
end
if !isempty(state.ready)
# we still have work we can schedule, so oversubscribe with round-robin
# TODO: if we're going to oversubcribe, do it intelligently
for p in procs
isempty(state.ready) && break
progress |= pop_and_fire!(ctx, state, chan, p)
end
end
@assert was_empty || progress
return progress
end
function pop_and_fire!(ctx, state, chan, proc; immediate_next=false)
task = pop_with_affinity!(ctx, state.ready, proc, immediate_next)
if task !== nothing
fire_task!(ctx, task, proc, state, chan)
return true
end
return false
end

# Main responsibility of this function is to check if new procs have been pushed to the context
function assign_new_procs!(ctx, state, chan, node_order, assignedprocs=[])
function assign_new_procs!(ctx, state, chan, assignedprocs)
ps = procs_to_use(ctx)
# Must track individual procs to handle the case when procs are removed
for p in setdiff(ps, assignedprocs)
isempty(state.ready) && break
task = pop_with_affinity!(ctx, state.ready, p, false)
if task !== nothing
fire_task!(ctx, task, p, state, chan, node_order)
end
diffps = setdiff(ps, assignedprocs)
if !isempty(diffps)
schedule!(ctx, state, chan, diffps)
end
return ps
end
Expand Down Expand Up @@ -246,7 +293,7 @@ function pop_with_affinity!(ctx, tasks, proc, immediate_next)
return nothing
end

function fire_task!(ctx, thunk, proc, state, chan, node_order)
function fire_task!(ctx, thunk, proc, state, chan)
@logmsg("W$(proc.pid) + $thunk ($(showloc(thunk.f, length(thunk.inputs)))) input:$(thunk.inputs) cache:$(thunk.cache) $(thunk.cache_ref)")
push!(state.running, thunk)
if thunk.cache && thunk.cache_ref !== nothing
Expand All @@ -256,12 +303,9 @@ function fire_task!(ctx, thunk, proc, state, chan, node_order)
if data !== nothing
@logmsg("cache hit: $(thunk.cache_ref)")
state.cache[thunk] = data
immediate_next = finish_task!(state, thunk, node_order; free=false)
immediate_next = finish_task!(state, thunk; free=false)
if !isempty(state.ready)
thunk = pop_with_affinity!(ctx, state.ready, proc, immediate_next)
if thunk !== nothing
fire_task!(ctx, thunk, proc, state, chan, node_order)
end
pop_and_fire!(ctx, state, chan, proc; immediate_next=immediate_next)
end
return
else
Expand All @@ -284,21 +328,16 @@ function fire_task!(ctx, thunk, proc, state, chan, node_order)
end

@dbg timespan_start(ctx, :compute, thunk.id, thunk.f)
Threads.atomic_add!(ACTIVE_TASKS, 1)
res = thunk.f(fetched...)
Threads.atomic_sub!(ACTIVE_TASKS, 1)
@dbg timespan_end(ctx, :compute, thunk.id, (thunk.f, p, typeof(res), sizeof(res)))

#push!(state.running, thunk)
state.cache[thunk] = res
immediate_next = finish_task!(state, thunk, node_order; free=false)
immediate_next = finish_task!(state, thunk; free=false)
if !isempty(state.ready)
if immediate_next
thunk = pop!(state.ready)
else
thunk = pop_with_affinity!(ctx, state.ready, proc, immediate_next)
end
if thunk !== nothing
fire_task!(ctx, thunk, proc, state, chan, node_order)
end
pop_and_fire!(ctx, state, chan, proc; immediate_next=immediate_next)
end
return
end
Expand All @@ -309,18 +348,16 @@ function fire_task!(ctx, thunk, proc, state, chan, node_order)
state.thunk_dict[thunk.id] = thunk
toptions = thunk.options !== nothing ? thunk.options : ThunkOptions()
options = merge(ctx.options, toptions)
if options.single > 0
proc = OSProc(options.single)
end
state.worker_pressure[proc.pid] += 1
async_apply(proc, thunk.id, thunk.f, data, chan, thunk.get_result, thunk.persist, thunk.cache, options, ids, ctx.log_sink)
end

function finish_task!(state, node, node_order; free=true)
function finish_task!(state, node; free=true)
if istask(node) && node.cache
node.cache_ref = state.cache[node]
end
immediate_next = false
for dep in sort!(collect(state.dependents[node]), by=node_order)
for dep in sort!(collect(state.dependents[node]), by=state.node_order)
set = state.waiting[dep]
pop!(set, node)
if isempty(set)
Expand Down Expand Up @@ -359,7 +396,10 @@ function start_state(deps::Dict, node_order)
Vector{Thunk}(undef, 0),
Dict{Thunk, Any}(),
Set{Thunk}(),
Dict{Int, Thunk}()
Dict{Int, Thunk}(),
node_order,
Dict{Int,Int}(),
Dict{Int,Int}(),
)

nodes = sort(collect(keys(deps)), by=node_order)
Expand Down Expand Up @@ -403,16 +443,19 @@ end
task_local_storage(:processor, to_proc)

# Execute
Threads.atomic_add!(ACTIVE_TASKS, 1)
res = execute!(to_proc, f, fetched...)
Threads.atomic_sub!(ACTIVE_TASKS, 1)

# Construct result
(from_proc, thunk_id, send_result ? res : tochunk(res, to_proc; persist=persist, cache=persist ? true : cache)) #todo: add more metadata
send_result ? res : tochunk(res, to_proc; persist=persist, cache=persist ? true : cache)
catch ex
bt = catch_backtrace()
(from_proc, thunk_id, RemoteException(myid(), CapturedException(ex, bt)))
RemoteException(myid(), CapturedException(ex, bt))
end
@dbg timespan_end(ctx, :compute, thunk_id, (f, to_proc, typeof(res), sizeof(res)))
result_meta
metadata = (pressure=ACTIVE_TASKS[],)
(from_proc, thunk_id, result_meta, metadata)
end

@noinline function async_apply(p::OSProc, thunk_id, f, data, chan, send_res, persist, cache, options, ids, log_sink)
Expand All @@ -421,7 +464,7 @@ end
put!(chan, remotecall_fetch(do_task, p.pid, thunk_id, f, data, send_res, persist, cache, options, ids, log_sink))
catch ex
bt = catch_backtrace()
put!(chan, (p, thunk_id, CapturedException(ex, bt)))
put!(chan, (p, thunk_id, CapturedException(ex, bt), nothing))
end
nothing
end
Expand Down

0 comments on commit c46a395

Please sign in to comment.