Skip to content

Commit

Permalink
Merge 2b21bda into c46a395
Browse files Browse the repository at this point in the history
  • Loading branch information
jpsamaroo committed Nov 17, 2020
2 parents c46a395 + 2b21bda commit 726f8f5
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 169 deletions.
31 changes: 21 additions & 10 deletions src/chunks.jl
Expand Up @@ -32,13 +32,13 @@ domain(x::Any) = UnitDomain()
A chunk with some data
"""
mutable struct Chunk{T, H, P<:Processor}
chunktype::Type
chunktype::Type{T}
domain
handle::H
processor::P
persist::Bool
function (::Type{Chunk{T,H,P}})(chunktype, domain, handle, processor, persist) where {T,H,P}
c = new{T,H,P}(chunktype, domain, handle, processor, persist)
function (::Type{Chunk{T,H,P}})(::Type{T}, domain, handle, processor, persist) where {T,H,P}
c = new{T,H,P}(T, domain, handle, processor, persist)
finalizer(x -> @async(myid() == 1 && free!(x)), c)
c
end
Expand Down Expand Up @@ -80,10 +80,21 @@ end
collect(ctx::Context, ref::DRef; options=nothing) =
move(OSProc(ref.owner), OSProc(), ref)
collect(ctx::Context, ref::FileRef; options=nothing) =
poolget(ref)
move(from_proc::OSProc, to_proc::OSProc, ref::Union{DRef, FileRef}) =
poolget(ref)

poolget(ref) # FIXME: Do move call

# Unwrap Chunk, DRef, and FileRef by default
move(from_proc::Processor, to_proc::Processor, x::Chunk) =
move(from_proc, to_proc, x.handle)
move(from_proc::Processor, to_proc::Processor, x::Union{DRef,FileRef}) =
move(from_proc, to_proc, poolget(x))

# Determine from_proc when unspecified
move(to_proc::Processor, chunk::Chunk) =
move(chunk.processor, to_proc, chunk)
move(to_proc::Processor, d::DRef) =
move(OSProc(d.owner), to_proc, d)
move(to_proc::Processor, x) =
move(OSProc(), to_proc, x)

### ChunkIO
affinity(r::DRef) = Pair{OSProc, UInt64}[OSProc(r.owner) => r.size]
Expand All @@ -102,9 +113,9 @@ end
Create a chunk from sequential object `x` which resides on `proc`.
"""
function tochunk(x, proc::P=OSProc(); persist=false, cache=false) where P
function tochunk(x::X, proc::P=OSProc(); persist=false, cache=false) where {X,P}
ref = poolset(x, destroyonevict=persist ? false : cache)
Chunk{Any, typeof(ref), P}(typeof(x), domain(x), ref, proc, persist)
Chunk{X, typeof(ref), P}(X, domain(x), ref, proc, persist)
end
tochunk(x::Union{Chunk, Thunk}, proc=nothing) = x

Expand All @@ -129,7 +140,7 @@ function savechunk(data, dir, f)
return position(io)
end
fr = FileRef(f, sz)
Chunk{Any, typeof(fr), P}(typeof(data), domain(data), fr, OSProc(), true)
Chunk{typeof(data), typeof(fr), P}(typeof(data), domain(data), fr, OSProc(), true)
end


Expand Down
91 changes: 20 additions & 71 deletions src/processor.jl
Expand Up @@ -24,18 +24,18 @@ calls differently than normal Julia.
execute!

"""
iscompatible(proc::Processor, opts, f, args...) -> Bool
iscompatible(proc::Processor, opts, f, Targs...) -> Bool
Indicates whether `proc` can execute `f` over `args` given `opts`. `Processor`
Indicates whether `proc` can execute `f` over `Targs` given `opts`. `Processor`
subtypes should overload this function to return `true` if and only if it is
essentially guaranteed that `f(args...)` is supported. Additionally,
essentially guaranteed that `f(::Targs...)` is supported. Additionally,
`iscompatible_func` and `iscompatible_arg` can be overriden to determine
compatibility of `f` and `args` individually. The default implementation
compatibility of `f` and `Targs` individually. The default implementation
returns `false`.
"""
iscompatible(proc::Processor, opts, f, args...) =
iscompatible(proc::Processor, opts, f, Targs...) =
iscompatible_func(proc, opts, f) &&
all(x->iscompatible_arg(proc, opts, x), args)
all(x->iscompatible_arg(proc, opts, x), Targs)
iscompatible_func(proc::Processor, opts, f) = false
iscompatible_arg(proc::Processor, opts, x) = false

Expand Down Expand Up @@ -70,58 +70,10 @@ get_parent
Moves and/or converts `x` such that it's available and suitable for usage on
the `to_proc` processor. This function can be overloaded by `Processor`
subtypes to transport arguments and convert them to an appropriate form before
being used for exection. This is additionally called on thunk results when
moving back to `from_proc` before being serialized over the wire as needed.
The default implementation breaks a single `move` call down into a sequence of
`move` calls, and is not intended to be maximally efficient.
being used for exection. Subtypes of `Processor` wishing to implement efficient
data movement should provide implementations where `x::Chunk`.
"""
function move(from_proc::Processor, to_proc::Processor, x)
if from_proc == to_proc
return x
end
@debug "Initiating generic move"
# Move to remote OSProc
@debug "(Remote) moving $parent_proc to $grandparent_proc"
root = get_parent_osproc(from_proc)
x, parent_proc = remotecall_fetch(move_to_osproc, root.pid, from_proc, x)

# Move to local OSProc
remote_proc = parent_proc
local_proc = OSProc()
@debug "(Network) moving $remote_proc to $local_proc"
x = move(remote_proc, local_proc, x)

# Move to to_proc
parent_proc = get_parent(to_proc)
path = Processor[to_proc, parent_proc]
while parent_proc != local_proc && !(parent_proc isa OSProc)
parent_proc = get_parent(parent_proc)
push!(path, parent_proc)
end
last_proc = local_proc
while !isempty(path)
next_proc = pop!(path)
@debug "(Local) moving $last_proc to $next_proc"
x = move(last_proc, next_proc, x)
last_proc = next_proc
end
return x
end
function get_parent_osproc(proc)
while !(proc isa OSProc)
proc = get_parent(proc)
end
proc
end
function move_to_osproc(parent_proc, x)
ctx = Context()
while !(parent_proc isa OSProc)
grandparent_proc = get_parent(parent_proc)
x = move(parent_proc, grandparent_proc, x)
parent_proc = grandparent_proc
end
return x, parent_proc
end
move(from_proc::Processor, to_proc::Processor, x) = x

"""
capacity(proc::Processor=OSProc()) -> Int
Expand Down Expand Up @@ -174,18 +126,19 @@ iscompatible_arg(proc::OSProc, opts, args...) =
proc.children)
get_processors(proc::OSProc) =
vcat((get_processors(child) for child in proc.children)...,)
function choose_processor(from_proc::OSProc, options, f, args)
if isempty(from_proc.queue)
for child in from_proc.children
function choose_processor(options, f, Targs)
osproc = OSProc()
if isempty(osproc.queue)
for child in osproc.children
grandchildren = get_processors(child)
append!(from_proc.queue, grandchildren)
append!(osproc.queue, grandchildren)
end
end
@assert !isempty(from_proc.queue)
for i in 1:length(from_proc.queue)
proc = popfirst!(from_proc.queue)
push!(from_proc.queue, proc)
if !iscompatible(proc, options, f, args...)
@assert !isempty(osproc.queue)
for i in 1:length(osproc.queue)
proc = popfirst!(osproc.queue)
push!(osproc.queue, proc)
if !iscompatible(proc, options, f, Targs...)
continue
end
if default_enabled(proc) && isempty(options.proctypes)
Expand All @@ -194,7 +147,7 @@ function choose_processor(from_proc::OSProc, options, f, args)
return proc
end
end
throw(ProcessorSelectionException(options.proctypes, from_proc.queue, f, args))
throw(ProcessorSelectionException(options.proctypes, osproc.queue, f, Targs))
end
struct ProcessorSelectionException <: Exception
proctypes::Vector{Type}
Expand All @@ -210,7 +163,6 @@ function Base.show(io::IO, pex::ProcessorSelectionException)
print(io, " Arguments: $(pex.args)")
end

move(from_proc::OSProc, to_proc::OSProc, x) = x
execute!(proc::OSProc, f, args...) = f(args...)
default_enabled(proc::OSProc) = true

Expand All @@ -226,8 +178,6 @@ end
iscompatible(proc::ThreadProc, opts, f, args...) = true
iscompatible_func(proc::ThreadProc, opts, f) = true
iscompatible_arg(proc::ThreadProc, opts, x) = true
move(from_proc::OSProc, to_proc::ThreadProc, x) = x
move(from_proc::ThreadProc, to_proc::OSProc, x) = x
@static if VERSION >= v"1.3.0-DEV.573"
execute!(proc::ThreadProc, f, args...) = fetch(Threads.@spawn begin
task_local_storage(:processor, proc)
Expand Down Expand Up @@ -280,7 +230,6 @@ Context(procs::Vector{P}=Processor[OSProc(w) for w in workers()];
profile=false, options=nothing) where {P<:Processor} =
Context(procs, proc_lock, log_sink, log_file, profile, options)
Context(xs::Vector{Int}) = Context(map(OSProc, xs))
Context() = Context([OSProc(w) for w in workers()])
procs(ctx::Context) = lock(ctx) do
copy(ctx.procs)
end
Expand Down
74 changes: 40 additions & 34 deletions src/scheduler.jl
Expand Up @@ -3,7 +3,7 @@ module Sch
using Distributed
import MemPool: DRef

import ..Dagger: Context, Processor, Thunk, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, capacity, choose_processor, execute!, rmprocs!, addprocs!
import ..Dagger: Context, Processor, Thunk, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, capacity, chunktype, choose_processor, execute!, rmprocs!, addprocs!

include("fault-handler.jl")

Expand Down Expand Up @@ -146,14 +146,15 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
end
end

proc, thunk_id, res, metadata = take!(chan) # get result of completed thunk
pid, thunk_id, (res, metadata) = take!(chan) # get result of completed thunk
proc = OSProc(pid)
lock(newtasks_lock) # This waits for any assign_new_procs! above to complete and then shuts down the task
if isa(res, CapturedException) || isa(res, RemoteException)
if check_exited_exception(res)
@warn "Worker $(proc.pid) died on thunk $thunk_id, rescheduling work"

# Remove dead worker from procs list
remove_dead_proc!(ctx, proc)
remove_dead_proc!(ctx, state, proc)

handle_fault(ctx, state, state.thunk_dict[thunk_id], proc, chan)
continue
Expand All @@ -163,7 +164,6 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
end
state.worker_pressure[proc.pid] = metadata.pressure
node = state.thunk_dict[thunk_id]
@logmsg("WORKER $(proc.pid) - $node ($(node.f)) input:$(node.inputs)")
state.cache[node] = res

@dbg timespan_start(ctx, :scheduler, thunk_id, master)
Expand All @@ -173,7 +173,7 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
end
@dbg timespan_end(ctx, :scheduler, thunk_id, master)
end
state.cache[d]
state.cache[d] # TODO: move(OSProc(), state.cache[d])
end

function procs_to_use(ctx, options=ctx.options)
Expand Down Expand Up @@ -236,6 +236,13 @@ function assign_new_procs!(ctx, state, chan, assignedprocs)
# Must track individual procs to handle the case when procs are removed
diffps = setdiff(ps, assignedprocs)
if !isempty(diffps)
for p in diffps
state.worker_pressure[p.pid] = 0
state.worker_capacity[p.pid] = 1
@async begin
state.worker_capacity[p.pid] = remotecall_fetch(capacity, p.pid)
end
end
schedule!(ctx, state, chan, diffps)
end
return ps
Expand All @@ -244,9 +251,11 @@ end
# Might be a good policy to not remove the proc if immediate_next
shall_remove_proc(ctx, proc) = proc procs_to_use(ctx)

function remove_dead_proc!(ctx, proc, options=ctx.options)
function remove_dead_proc!(ctx, state, proc, options=ctx.options)
@assert options.single !== proc.pid "Single worker failed, cannot continue."
rmprocs!(ctx, [proc])
delete!(state.worker_pressure, proc.pid)
delete!(state.worker_capacity, proc.pid)
end

function pop_with_affinity!(ctx, tasks, proc, immediate_next)
Expand Down Expand Up @@ -301,16 +310,16 @@ function fire_task!(ctx, thunk, proc, state, chan)
data = unrelease(thunk.cache_ref) # ask worker to keep the data around
# till this compute cycle frees it
if data !== nothing
@logmsg("cache hit: $(thunk.cache_ref)")
# cache hit
state.cache[thunk] = data
immediate_next = finish_task!(state, thunk; free=false)
if !isempty(state.ready)
pop_and_fire!(ctx, state, chan, proc; immediate_next=immediate_next)
end
return
else
# cache miss
thunk.cache_ref = nothing
@logmsg("cache miss: $(thunk.cache_ref) recomputing $(thunk)")
end
end

Expand All @@ -320,20 +329,22 @@ function fire_task!(ctx, thunk, proc, state, chan)
if thunk.meta
# Run it on the parent node, do not move data.
p = OSProc(myid())
fetched = map(Iterators.zip(thunk.inputs,ids)) do (x, id)
@dbg timespan_start(ctx, :comm, (thunk.id, id), (thunk.f, id))
x = istask(x) ? state.cache[x] : x
@dbg timespan_end(ctx, :comm, (thunk.id, id), (thunk.f, id))
return x
end
fetched = fetch.(map(Iterators.zip(thunk.inputs,ids)) do (x, id)
@async begin
@dbg timespan_start(ctx, :comm, (thunk.id, id), (thunk.f, id))
x = istask(x) ? state.cache[x] : x
@dbg timespan_end(ctx, :comm, (thunk.id, id), (thunk.f, id))
return x
end
end)

@dbg timespan_start(ctx, :compute, thunk.id, thunk.f)
Threads.atomic_add!(ACTIVE_TASKS, 1)
res = thunk.f(fetched...)
Threads.atomic_sub!(ACTIVE_TASKS, 1)
@dbg timespan_end(ctx, :compute, thunk.id, (thunk.f, p, typeof(res), sizeof(res)))

#push!(state.running, thunk)
# TODO: push!(state.running, thunk) (when the scheduler becomes multithreaded)
state.cache[thunk] = res
immediate_next = finish_task!(state, thunk; free=false)
if !isempty(state.ready)
Expand Down Expand Up @@ -420,22 +431,17 @@ end

@noinline function do_task(thunk_id, f, data, send_result, persist, cache, options, ids, log_sink)
ctx = Context(Processor[]; log_sink=log_sink)
from_proc = OSProc()
fetched = map(Iterators.zip(data,ids)) do (x, id)
@dbg timespan_start(ctx, :comm, (thunk_id, id), (f, id))
x = x isa Union{Chunk,Thunk} ? collect(ctx, x) : x
@dbg timespan_end(ctx, :comm, (thunk_id, id), (f, id))
return x
end

# TODO: Time choose_processor?
to_proc = choose_processor(from_proc, options, f, fetched)
fetched = map(Iterators.zip(fetched,ids)) do (x, id)
@dbg timespan_start(ctx, :move, (thunk_id, id), (f, id))
x = move(from_proc, to_proc, x)
@dbg timespan_end(ctx, :move, (thunk_id, id), (f, id))
return x
end
# TODO: Time choose_processor
Tdata = map(x->x isa Chunk ? chunktype(x) : x, data)
to_proc = choose_processor(options, f, Tdata)
fetched = fetch.(map(Iterators.zip(data,ids)) do (x, id)
@async begin
@dbg timespan_start(ctx, :move, (thunk_id, id), (f, id))
x = move(to_proc, x)
@dbg timespan_end(ctx, :move, (thunk_id, id), (f, id))
return x
end
end)
@dbg timespan_start(ctx, :compute, thunk_id, f)
res = nothing
result_meta = try
Expand All @@ -455,16 +461,16 @@ end
end
@dbg timespan_end(ctx, :compute, thunk_id, (f, to_proc, typeof(res), sizeof(res)))
metadata = (pressure=ACTIVE_TASKS[],)
(from_proc, thunk_id, result_meta, metadata)
(result_meta, metadata)
end

@noinline function async_apply(p::OSProc, thunk_id, f, data, chan, send_res, persist, cache, options, ids, log_sink)
@async begin
try
put!(chan, remotecall_fetch(do_task, p.pid, thunk_id, f, data, send_res, persist, cache, options, ids, log_sink))
put!(chan, (p.pid, thunk_id, remotecall_fetch(do_task, p.pid, thunk_id, f, data, send_res, persist, cache, options, ids, log_sink)))
catch ex
bt = catch_backtrace()
put!(chan, (p, thunk_id, CapturedException(ex, bt), nothing))
put!(chan, (p.pid, thunk_id, (CapturedException(ex, bt), nothing)))
end
nothing
end
Expand Down

0 comments on commit 726f8f5

Please sign in to comment.