Skip to content

Commit

Permalink
Merge d30da25 into 5100fb5
Browse files Browse the repository at this point in the history
  • Loading branch information
jpsamaroo committed Apr 7, 2020
2 parents 5100fb5 + d30da25 commit 941219b
Show file tree
Hide file tree
Showing 8 changed files with 345 additions and 49 deletions.
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,38 @@ top_node = delayed(group)(head_nodes...)
compute(top_node)
```

## Scheduler and Thunk options

While Dagger generally "just works", sometimes one needs to exert some more
fine-grained control over how the scheduler allocates work. There are two
parallel mechanisms to achieve this: Scheduler options (from
`Dagger.Sch.SchedulerOptions`) and Thunk options (from
`Dagger.Sch.ThunkOptions`). These two options structs generally contain the
same options, with the difference being that Scheduler options operate
globally across an entire DAG, and Thunk options operate on a thunk-by-thunk
basis. Scheduler options can be constructed and passed to `collect()` or
`compute()` as the keyword argument `options`, and Thunk options can be passed
to Dagger's `delayed` function similarly: `delayed(myfunc)(arg1, arg2, ...;
options=opts)`. Check the docstring for the two options structs to see what
options are available.

## Processors and Resource control

By default, Dagger uses the CPU to process work, typically single-threaded per
cluster node. However, Dagger allows access to a wider range of hardware and
software acceleration techniques, such as multithreading and GPUs. These more
advanced (but performant) accelerators are disabled by default, but can easily
be enabled by using Scheduler/Thunk options in the `proctypes` field. If
non-empty, only the processor types contained in `options.proctypes` will be
used to compute all or a given thunk.

### GPU Processors

The [DaggerGPU.jl](https://github.com/jpsamaroo/DaggerGPU.jl) package can be
imported to enable GPU acceleration for NVIDIA and AMD GPUs, when available.
The processors provided by that package are not enabled by default, but may be
enabled via `options.proctypes` as usual.

## Rough high level description of scheduling

- First picks the leaf Thunks and distributes them to available workers. Each worker is given at most 1 task at a time. If input to the node is a `Chunk`, then workers which already have the chunk are preferred.
Expand Down
10 changes: 10 additions & 0 deletions src/Dagger.jl
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,14 @@ include("array/sort.jl")

include("ui/graph.jl")

function __init__()
@static if VERSION >= v"1.3.0-DEV.573"
for tid in 1:Threads.nthreads()
push!(PROCESSOR_CALLBACKS, proc->ThreadProc(myid(), tid))
end
else
push!(PROCESSOR_CALLBACKS, proc->ThreadProc(myid(), 1))
end
end

end # module
44 changes: 28 additions & 16 deletions src/chunks.jl
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ domain(x::Any) = UnitDomain()
"""
A chunk with some data
"""
mutable struct Chunk{T, H}
mutable struct Chunk{T, H, P<:Processor}
chunktype::Type
domain
handle::H
processor::P
persist::Bool
function (::Type{Chunk{T,H}})(chunktype, domain, handle, persist) where {T,H}
c = new{T,H}(chunktype, domain, handle, persist)
function (::Type{Chunk{T,H,P}})(chunktype, domain, handle, processor, persist) where {T,H,P}
c = new{T,H,P}(chunktype, domain, handle, processor, persist)
finalizer(x -> @async(myid() == 1 && free!(x)), c)
c
end
Expand All @@ -49,7 +50,7 @@ persist!(t::Chunk) = (t.persist=true; t)
shouldpersist(p::Chunk) = t.persist
affinity(c::Chunk) = affinity(c.handle)

function unrelease(c::Chunk{T,DRef}) where T
function unrelease(c::Chunk{T,DRef,P}) where {T,P}
# set spilltodisk = true if data is still around
try
destroyonevict(c.handle, false)
Expand All @@ -64,16 +65,27 @@ function unrelease(c::Chunk{T,DRef}) where T
end
unrelease(c::Chunk) = c

collect_remote(chunk::Chunk) =
move(Context(), chunk.processor, OSProc(), poolget(chunk.handle))
function collect(ctx::Context, chunk::Chunk; options=nothing)
# delegate fetching to handle by default.
collect(ctx, chunk.handle)
if chunk.handle isa DRef && !(chunk.processor isa OSProc)
return remotecall_fetch(collect_remote, chunk.handle.owner, chunk)
elseif chunk.handle isa FileRef
return poolget(chunk.handle)
else
return move(ctx, chunk.processor, OSProc(), chunk.handle)
end
end
collect(ctx::Context, ref::DRef; options=nothing) =
move(ctx, OSProc(ref.owner), OSProc(), ref)
collect(ctx::Context, ref::FileRef; options=nothing) =
poolget(ref)
move(ctx, from_proc::OSProc, to_proc::OSProc, ref::Union{DRef, FileRef}) =
poolget(ref)


### ChunkIO
function collect(ctx::Context, ref::Union{DRef, FileRef}; options=nothing)
poolget(ref)
end
affinity(r::DRef) = Pair{OSProc, UInt64}[OSProc(r.owner) => r.size]
function affinity(r::FileRef)
if haskey(MemPool.who_has_read, r.file)
Expand All @@ -85,7 +97,7 @@ function affinity(r::FileRef)
end
end

function Serialization.deserialize(io::AbstractSerializer, dt::Type{Chunk{T,H}}) where {T,H}
function Serialization.deserialize(io::AbstractSerializer, dt::Type{Chunk{T,H,P}}) where {T,H,P}
nf = fieldcount(dt)
c = ccall(:jl_new_struct_uninit, Any, (Any,), dt)
Serialization.deserialize_cycle(io, c)
Expand All @@ -100,19 +112,19 @@ function Serialization.deserialize(io::AbstractSerializer, dt::Type{Chunk{T,H}})
end

"""
tochunk(x; persist=false, cache=false) -> Chunk
tochunk(x, proc; persist=false, cache=false) -> Chunk
Create a chunk from a sequential object.
Create a chunk from sequential object `x` which resides on `proc`.
"""
function tochunk(x; persist=false, cache=false)
function tochunk(x, proc::P=OSProc(); persist=false, cache=false) where P
ref = poolset(x, destroyonevict=persist ? false : cache)
Chunk{Any, typeof(ref)}(typeof(x), domain(x), ref, persist)
Chunk{Any, typeof(ref), P}(typeof(x), domain(x), ref, proc, persist)
end
tochunk(x::Union{Chunk, Thunk}) = x
tochunk(x::Union{Chunk, Thunk}, proc=nothing) = x

# Check to see if the node is set to persist
# if it is foce can override it
function free!(s::Chunk{X, DRef}; force=true, cache=false) where X
function free!(s::Chunk{X, DRef, P}; force=true, cache=false) where {X,P}
if force || !s.persist
if cache
try
Expand All @@ -134,7 +146,7 @@ function savechunk(data, dir, f)
return position(io)
end
fr = FileRef(f, sz)
Chunk{Any, typeof(fr)}(typeof(data), domain(data), fr, true)
Chunk{Any, typeof(fr), P}(typeof(data), domain(data), fr, OSProc(), true)
end


Expand Down
203 changes: 199 additions & 4 deletions src/processor.jl
Original file line number Diff line number Diff line change
@@ -1,19 +1,211 @@
export OSProc, Context

"""
Processor
An abstract type representing a processing device and associated memory, where
data can be stored and operated on. Subtypes should be immutable, and
instances should compare equal if they represent the same logical processing
device/memory. Subtype instances should be serializable between different
nodes. Subtype instances may contain a pointer to a "parent" `Processor` to
make it easy to transfer data to/from other types of `Processor` at runtime.
"""
abstract type Processor end

const PROCESSOR_CALLBACKS = []

"""
execute!(proc::Processor, f, args...) -> Any
Executes the function `f` with arguments `args` on processor `proc`. This
function can be overloaded by `Processor` subtypes to allow executing function
calls differently than normal Julia.
"""
execute!

"""
iscompatible(proc::Processor, opts, f, args...) -> Bool
Indicates whether `proc` can execute `f` over `args` given `opts`. `Processor`
subtypes should overload this function to return `true` if and only if it is
essentially guaranteed that `f(args...)` is supported. Additionally,
`iscompatible_func` and `iscompatible_arg` can be overriden to determine
compatibility of `f` and `args` individually. The default implementation
returns `false`.
"""
iscompatible(proc::Processor, opts, f, args...) =
iscompatible_func(proc, opts, f) &&
all(x->iscompatible_arg(proc, opts, x), args)
iscompatible_func(proc::Processor, opts, f) = false
iscompatible_arg(proc::Processor, opts, x) = false

"""
default_enabled(proc::Processor) -> Bool
Returns whether processor `proc` is enabled by default (opt-out). `Processor` subtypes can override this function to make themselves opt-in (default returns `false`).
"""
OS process - contains pid returned by `Distributed.workers`
default_enabled(proc::Processor) = false

"""
get_processors(proc::Processor) -> Vector{T} where T<:Processor
Returns the full list of processors contained in `proc`, if any. `Processor`
subtypes should overload this function if they can contain sub-processors. The
default method will return a `Vector` containing `proc` itself.
"""
get_processors(proc::Processor) = Processor[proc]

"""
get_parent(proc::Processor) -> Processor
Returns the parent processor for `proc`. The ultimate parent processor is an
`OSProc`. `Processor` subtypes should overload this to return their most
direct parent.
"""
get_parent

"""
move(from_proc::Processor, to_proc::Processor, x)
Moves and/or converts `x` such that it's available and suitable for usage on
the `to_proc` processor. This function can be overloaded by `Processor`
subtypes to transport arguments and convert them to an appropriate form before
being used for exection. This is additionally called on thunk results when
moving back to `from_proc` before being serialized over the wire as needed.
The default implementation breaks a single `move` call down into a sequence of
`move` calls, and is not intended to be maximally efficient.
"""
function move(ctx, from_proc::Processor, to_proc::Processor, x)
@debug "Initiating generic move"
# Move to remote OSProc
parent_proc = from_proc
while !(parent_proc isa OSProc)
# FIXME: Initiate this chain on remote side
grandparent_proc = get_parent(parent_proc)
@debug "(Remote) moving $parent_proc to $grandparent_proc"
x = move(ctx, parent_proc, grandparent_proc, x)
parent_proc = grandparent_proc
end

# Move to local OSProc
remote_proc = parent_proc
local_proc = OSProc()
@debug "(Network) moving $remote_proc to $local_proc"
x = move(ctx, remote_proc, local_proc, x)

# Move to to_proc
parent_proc = get_parent(to_proc)
path = Processor[to_proc, parent_proc]
while parent_proc != local_proc
parent_proc = get_parent(parent_proc)
push!(path, parent_proc)
end
last_proc = local_proc
while !isempty(path)
next_proc = pop!(path)
@debug "(Local) moving $last_proc to $next_proc"
x = move(ctx, last_proc, next_proc, x)
last_proc = next_proc
end
return x
end

"""
OSProc <: Processor
Julia CPU (OS) process, identified by Distributed pid. Executes thunks when
threads or other "children" processors are not available, and/or the user has
not opted to use those processors.
"""
struct OSProc <: Processor
pid::Int
attrs::Dict{Symbol,Any}
children::Vector{Processor}
queue::Vector{Processor}
end
const OSPROC_CACHE = Dict{Int,OSProc}()
OSProc(pid::Int=myid()) = get!(OSPROC_CACHE, pid) do
remotecall_fetch(get_osproc, pid, pid)
end
function get_osproc(pid::Int)
proc = OSProc(pid, Dict{Symbol,Any}(), Processor[], Processor[])
for cb in PROCESSOR_CALLBACKS
try
child = cb(proc)
child !== nothing && push!(proc.children, child)
catch err
@error "Error in processor callback" exception=(err,catch_backtrace())
end
end
proc
end
function add_callback!(func)
push!(Dagger.PROCESSOR_CALLBACKS, func)
empty!(OSPROC_CACHE)
end
Base.:(==)(proc1::OSProc, proc2::OSProc) = proc1.pid == proc2.pid
function iscompatible(proc::OSProc, opts, f, args...)
for child in proc.children
if iscompatible(child, opts, f, args...)
return true
end
end
return true
end
get_processors(proc::OSProc) =
vcat(get_processors(child) for child in proc.children)
function choose_processor(from_proc::OSProc, options, f, args)
if isempty(from_proc.queue)
for child in from_proc.children
grandchildren = get_processors(child)
append!(from_proc.queue, grandchildren)
end
end
@assert !isempty(from_proc.queue)
for i in 1:length(from_proc.queue)
proc = popfirst!(from_proc.queue)
push!(from_proc.queue, proc)
if !iscompatible(proc, options, f, args...)
continue
end
if default_enabled(proc) && isempty(options.proctypes)
return proc
elseif any(p->proc isa p, options.proctypes)
return proc
end
end
@error "($(myid())) Exhausted all available processor types!" proctypes=options.proctypes procsavail=from_proc.queue args=args
end
move(ctx, from_proc::OSProc, to_proc::OSProc, x) = x
execute!(proc::OSProc, f, args...) = f(args...)
default_enabled(proc::OSProc) = true

"""
A context represents a set of processors to use for an operation.
ThreadProc <: Processor
Julia CPU (OS) thread, identified by Julia thread ID.
"""
struct ThreadProc <: Processor
owner::Int
tid::Int
end
iscompatible(proc::ThreadProc, opts, f, args...) = true
move(ctx, from_proc::OSProc, to_proc::ThreadProc, x) = x
move(ctx, from_proc::ThreadProc, to_proc::OSProc, x) = x
@static if VERSION >= v"1.3.0-DEV.573"
execute!(proc::ThreadProc, f, args...) = fetch(Threads.@spawn f(args...))
else
# TODO: Use Threads.@threads?
execute!(proc::ThreadProc, f, args...) = fetch(@async f(args...))
end
get_parent(proc::ThreadProc) = OSProc(proc.owner)
default_enabled(proc::ThreadProc) = true

# TODO: ThreadGroupProc?

"A context represents a set of processors to use for an operation."
mutable struct Context
procs::Vector{OSProc}
procs::Vector{Processor}
log_sink::Any
profile::Bool
options
Expand All @@ -36,7 +228,10 @@ function Context(xs)
Context(xs, NoOpLog(), false, nothing) # By default don't log events
end
Context(xs::Vector{Int}) = Context(map(OSProc, xs))
Context(;nthreads=Threads.nthreads()) = Context([workers() for i=1:nthreads] |> Iterators.flatten |> collect)
function Context()
procs = [OSProc(w) for w in workers()]
Context(procs)
end
procs(ctx::Context) = ctx.procs

"""
Expand Down

0 comments on commit 941219b

Please sign in to comment.