Skip to content

Commit

Permalink
Merge 01834b3 into 4937a56
Browse files Browse the repository at this point in the history
  • Loading branch information
visr committed Feb 16, 2020
2 parents 4937a56 + 01834b3 commit 547f2ad
Show file tree
Hide file tree
Showing 16 changed files with 157 additions and 122 deletions.
3 changes: 0 additions & 3 deletions src/Dagger.jl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ include("lib/logging.jl")
# Distributed data
include("processor.jl")
include("thunk.jl")
include("domain.jl")
include("chunks.jl")

# Task scheduling
Expand All @@ -41,8 +40,6 @@ include("array/matrix.jl")
include("array/sparse_partition.jl")
include("array/sort.jl")

include("array/show.jl")

include("ui/graph.jl")

end # module
55 changes: 28 additions & 27 deletions src/array/darray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,15 @@ function getindex(a::ArrayDomain, b::ArrayDomain)
end

"""
alignfirst(a)
alignfirst(a) -> ArrayDomain
Make a subdomain a standalone domain. For example,
Make a subdomain a standalone domain.
alignfirst(ArrayDomain(11:25, 21:100))
# => ArrayDomain((1:15), (1:80))
# Example
```julia-repl
julia> alignfirst(ArrayDomain(11:25, 21:100))
ArrayDomain((1:15), (1:80))
```
"""
alignfirst(a::ArrayDomain) =
ArrayDomain(map(r->1:length(r), indexes(a)))
Expand All @@ -59,7 +62,11 @@ ndims(a::ArrayDomain) = length(size(a))
isempty(a::ArrayDomain) = length(a) == 0


"The domain of an array is a ArrayDomain"
"""
domain(x::AbstractArray) -> ArrayDomain
The domain of an array is an ArrayDomain.
"""
domain(x::AbstractArray) = ArrayDomain([1:l for l in size(x)])


Expand All @@ -85,23 +92,26 @@ function Base.show(io::IO, x::ArrayOp)
end

"""
`DArray{T,N,F}(domain, subdomains, chunks, concat)`
DArray{T,N,F}(domain, subdomains, chunks, concat)
DArray(T, domain, subdomains, chunks, [concat=cat])
An N-dimensional distributed array of element type T.
An N-dimensional distributed array of element type T, with a concatenation function of type F.
- `domain`: the whole ArrayDomain of the array
- `subdomains`: a `DomainBlocks` of the same dimensions as the array
- `chunks`: an array of chunks of dimension N
- `concat`: a function of type `F`. `concat(x, y; dims=d)` takes two chunks `x` and `y`
and concatenates them along dimension `d`. `cat` is used by default.
# Arguments
- `T`: element type
- `domain::ArrayDomain{N}`: the whole ArrayDomain of the array
- `subdomains::AbstractArray{ArrayDomain{N}, N}`: a `DomainBlocks` of the same dimensions as the array
- `chunks::AbstractArray{Union{Chunk,Thunk}, N}`: an array of chunks of dimension N
- `concat::F`: a function of type `F`. `concat(x, y; dims=d)` takes two chunks `x` and `y`
and concatenates them along dimension `d`. `cat` is used by default.
"""
mutable struct DArray{T,N,F} <: ArrayOp{T, N}
domain::ArrayDomain{N}
subdomains::AbstractArray{ArrayDomain{N}, N}
chunks::AbstractArray{Union{Chunk,Thunk}, N}
concat::F
freed::Threads.Atomic{UInt8}
function DArray{T,N,F}(domain, subdomains, chunks, concat) where {T, N,F}
function DArray{T,N,F}(domain, subdomains, chunks, concat::Function) where {T, N,F}
new(domain, subdomains, chunks, concat, Threads.Atomic{UInt8}(0))
end
end
Expand All @@ -126,16 +136,6 @@ end
# mainly for backwards-compatibility
DArray{T, N}(domain, subdomains, chunks) where {T,N} = DArray(T, domain, subdomains, chunks)


"""
`DArray(T, domain, subdomains, chunks, [concat=cat])`
Creates a distributed array of element type T.
- `T`: element type
rest of the arguments are the same as the DArray constructor.
"""
function DArray(T, domain::ArrayDomain{N},
subdomains::AbstractArray{ArrayDomain{N}, N},
chunks::AbstractArray{<:Any, N}, concat=cat) where N
Expand Down Expand Up @@ -237,7 +237,7 @@ end
A DArray object may contain a thunk in it, in which case
we first turn it into a Thunk object and then compute it.
"""
function compute(ctx, x::DArray; persist=true, options=nothing)
function compute(ctx::Context, x::DArray; persist=true, options=nothing)
thunk = thunkize(ctx, x, persist=persist)
if isa(thunk, Thunk)
compute(ctx, thunk; options=options)
Expand All @@ -249,7 +249,7 @@ end
"""
If a DArray tree has a Thunk in it, make the whole thing a big thunk
"""
function thunkize(ctx, c::DArray; persist=true)
function thunkize(ctx::Context, c::DArray; persist=true)
if any(istask, chunks(c))
thunks = chunks(c)
sz = size(thunks)
Expand All @@ -269,6 +269,7 @@ function thunkize(ctx, c::DArray; persist=true)
end

global _stage_cache = WeakKeyDict{Context, Dict}()

"""
A memoized version of stage. It is important that the
tasks generated for the same DArray have the same
Expand All @@ -279,7 +280,7 @@ identity, for example:
must not result in computation of A twice.
"""
function cached_stage(ctx, x)
function cached_stage(ctx::Context, x)
cache = if !haskey(_stage_cache, ctx)
_stage_cache[ctx] = Dict()
else
Expand Down Expand Up @@ -318,7 +319,7 @@ Base.@deprecate BlockPartition Blocks
Distribute(p::Blocks, data::AbstractArray) =
Distribute(partition(p, domain(data)), data)

function stage(ctx, d::Distribute)
function stage(ctx::Context, d::Distribute)
if isa(d.data, ArrayOp)
# distributing a dsitributed array
x = cached_stage(ctx, d.data)
Expand Down
4 changes: 2 additions & 2 deletions src/array/getindex.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ end
GetIndex(input::ArrayOp, idx::Tuple) =
GetIndex{eltype(input), ndims(input)}(input, idx)

function stage(ctx, gidx::GetIndex)
function stage(ctx::Context, gidx::GetIndex)
inp = cached_stage(ctx, gidx.input)

dmn = domain(inp)
Expand All @@ -31,7 +31,7 @@ struct GetIndexScalar <: Computation
idx::Tuple
end

function stage(ctx, gidx::GetIndexScalar)
function stage(ctx::Context, gidx::GetIndexScalar)
inp = cached_stage(ctx, gidx.input)
s = view(inp, ArrayDomain(gidx.idx))
delayed(identity)(collect(s)[1])
Expand Down
10 changes: 5 additions & 5 deletions src/array/map-reduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ size(x::Map) = size(x.inputs[1])

Map(f, inputs::Tuple) = Map{Any, ndims(inputs[1])}(f, inputs)

function stage(ctx, node::Map)
function stage(ctx::Context, node::Map)
inputs = Any[cached_stage(ctx, n) for n in node.inputs]
primary = inputs[1] # all others will align to this guy
domains = domainchunks(primary)
Expand All @@ -38,7 +38,7 @@ struct ReduceBlock <: Computation
get_result::Bool
end

function stage(ctx, r::ReduceBlock)
function stage(ctx::Context, r::ReduceBlock)
inp = stage(ctx, r.input)
reduced_parts = map(x -> Thunk(r.op, x; get_result=r.get_result), chunks(inp))
Thunk((xs...) -> r.op_master(xs), reduced_parts...; meta=true)
Expand All @@ -51,7 +51,7 @@ reduceblock(f, x::ArrayOp) = compute(reduceblock_async(f, x))
reduceblock(f, g::Function, x::ArrayOp) =
compute(reduceblock_async(f, g, x))

reduce_async(f, x::ArrayOp) = reduceblock_async(xs->reduce(f,xs), xs->reduce(f,xs), x)
reduce_async(f::Function, x::ArrayOp) = reduceblock_async(xs->reduce(f,xs), xs->reduce(f,xs), x)

sum(x::ArrayOp; dims::Union{Int,Nothing} = nothing) = _sum(x, dims)
_sum(x, dims::Nothing) = reduceblock(sum, sum, x)
Expand Down Expand Up @@ -106,7 +106,7 @@ function Reducedim(op, input, dims)
Reducedim{T,ndims(input)}(op, input, dims)
end

function reduce(f, x::ArrayOp; dims = nothing)
function reduce(f::Function, x::ArrayOp; dims = nothing)
if dims === nothing
return compute(reduce_async(f,x))
elseif dims isa Int
Expand All @@ -115,7 +115,7 @@ function reduce(f, x::ArrayOp; dims = nothing)
Reducedim(f, x, dims::Tuple)
end

function stage(ctx, r::Reducedim)
function stage(ctx::Context, r::Reducedim)
inp = cached_stage(ctx, r.input)
thunks = let op = r.op, dims=r.dims
# do reducedim on each block
Expand Down
24 changes: 11 additions & 13 deletions src/array/matrix.jl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ function _ctranspose(x::AbstractArray)
Any[delayed(adjoint)(x[j,i]) for i=1:size(x,2), j=1:size(x,1)]
end

function stage(ctx, node::Transpose)
function stage(ctx::Context, node::Transpose)
inp = cached_stage(ctx, node.input)
thunks = _ctranspose(chunks(inp))
DArray(eltype(inp), domain(inp)', domainchunks(inp)', thunks)
Expand Down Expand Up @@ -121,7 +121,7 @@ function _mul(a::Vector, b::Vector; T=eltype(b))
[x * b[1] for x in a]
end

function promote_distribution(ctx, m::MatMul, a,b)
function promote_distribution(ctx::Context, m::MatMul, a,b)
iscompat = try domain(a) * domain(b); true
catch e; false end
if iscompat
Expand All @@ -135,7 +135,7 @@ function promote_distribution(ctx, m::MatMul, a,b)
a, cached_stage(ctx, Distribute(d, b))
end

function stage_operands(ctx, m::MatMul, a, b)
function stage_operands(ctx::Context, m::MatMul, a, b)
if size(a, 2) != size(b, 1)
error(DimensionMismatch("Inputs to * have incompatible size"))
end
Expand All @@ -146,10 +146,8 @@ function stage_operands(ctx, m::MatMul, a, b)
promote_distribution(ctx, m, stg_a, stg_b)
end

"""
an operand which should be distributed as per convenience
"""
function stage_operands(ctx, ::MatMul, a::ArrayOp, b::PromotePartition{T,1}) where T
"An operand which should be distributed as per convenience"
function stage_operands(ctx::Context, ::MatMul, a::ArrayOp, b::PromotePartition{T,1}) where T
stg_a = cached_stage(ctx, a)
dmn_a = domain(stg_a)
dchunks_a = domainchunks(stg_a)
Expand All @@ -162,7 +160,7 @@ function stage_operands(ctx, ::MatMul, a::ArrayOp, b::PromotePartition{T,1}) whe
stg_a, cached_stage(ctx, Distribute(dmn_out, b.data))
end

function stage_operands(ctx, ::MatMul, a::PromotePartition, b::ArrayOp)
function stage_operands(ctx::Context, ::MatMul, a::PromotePartition, b::ArrayOp)

if size(a, 2) != size(b, 1)
throw(DimensionMismatch("Cannot promote array of domain $(dmn_b) to multiply with an array of size $(dmn_a)"))
Expand All @@ -174,7 +172,7 @@ function stage_operands(ctx, ::MatMul, a::PromotePartition, b::ArrayOp)
cached_stage(ctx, Distribute(dmn_out, a.data)), stg_b
end

function stage(ctx, mul::MatMul)
function stage(ctx::Context, mul::MatMul)
a, b = stage_operands(ctx, mul, mul.a, mul.b)
d = domain(a)*domain(b)
DArray(Any, d, domainchunks(a)*domainchunks(b),
Expand All @@ -199,13 +197,13 @@ scale(l::Vector, r::ArrayOp) = scale(PromotePartition(l), r)
(*)(l::Diagonal, r::ArrayOp) = Scale(PromotePartition(l.diag), r)
scale(l::ArrayOp, r::ArrayOp) = Scale(l, r)

function stage_operand(ctx, ::Scale, a, b::PromotePartition)
function stage_operand(ctx::Context, ::Scale, a, b::PromotePartition)
ps = domainchunks(a)
b_parts = DomainBlocks((1,), (ps.cumlength[1],))
cached_stage(ctx, Distribute(b_parts, b.data))
end

function stage_operand(ctx, ::Scale, a, b)
function stage_operand(ctx::Context, ::Scale, a, b)
cached_stage(ctx, b)
end

Expand All @@ -217,7 +215,7 @@ function _scale(l, r)
res
end

function stage(ctx, scal::Scale)
function stage(ctx::Context, scal::Scale)
r = cached_stage(ctx, scal.r)
l = stage_operand(ctx, scal, r, scal.l)

Expand Down Expand Up @@ -250,7 +248,7 @@ function Base.cat(d::ArrayDomain, ds::ArrayDomain...; dims::Int)
ArrayDomain(out_idxs)
end

function stage(ctx, c::Concat)
function stage(ctx::Context, c::Concat)
inp = Any[cached_stage(ctx, x) for x in c.inputs]

dmns = map(domain, inp)
Expand Down
14 changes: 7 additions & 7 deletions src/array/operators.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import Base.Broadcast: Broadcasted, BroadcastStyle, combine_eltypes

"""
This is a way of suggesting that stage should call
stage_operand with the operation and other arguments
stage_operand with the operation and other arguments.
"""
struct PromotePartition{T,N} <: ArrayOp{T,N}
data::AbstractArray{T,N}
Expand All @@ -28,17 +28,17 @@ BCast(b::Broadcasted) = BCast{typeof(b), combine_eltypes(b.f, b.args), length(ax

size(x::BCast) = map(length, axes(x.bcasted))

function stage_operands(ctx, ::BCast, xs::ArrayOp...)
function stage_operands(ctx::Context, ::BCast, xs::ArrayOp...)
map(x->cached_stage(ctx, x), xs)
end

function stage_operands(ctx, ::BCast, x::ArrayOp, y::PromotePartition)
function stage_operands(ctx::Context, ::BCast, x::ArrayOp, y::PromotePartition)
stg_x = cached_stage(ctx, x)
y1 = Distribute(domain(stg_x), y.data)
stg_x, cached_stage(ctx, y1)
end

function stage_operands(ctx, ::BCast, x::PromotePartition, y::ArrayOp)
function stage_operands(ctx::Context, ::BCast, x::PromotePartition, y::ArrayOp)
stg_y = cached_stage(ctx, y)
x1 = Distribute(domain(stg_y), x.data)
cached_stage(ctx, x1), stg_y
Expand All @@ -54,7 +54,7 @@ function Base.copy(b::Broadcast.Broadcasted{<:DaggerBroadcastStyle})
BCast(b)
end

function stage(ctx, node::BCast)
function stage(ctx::Context, node::BCast)
bc = Broadcast.flatten(node.bcasted)
args = bc.args
args1 = map(args) do x
Expand Down Expand Up @@ -102,9 +102,9 @@ struct MapChunk{F, Ni, T, Nd} <: ArrayOp{T, Nd}
input::NTuple{Ni, ArrayOp{T,Nd}}
end

mapchunk(f, xs::ArrayOp...) = MapChunk(f, xs)
mapchunk(f::Function, xs::ArrayOp...) = MapChunk(f, xs)
Base.@deprecate mappart(args...) mapchunk(args...)
function stage(ctx, node::MapChunk)
function stage(ctx::Context, node::MapChunk)
inputs = map(x->cached_stage(ctx, x), node.input)
thunks = map(map(chunks, inputs)...) do ps...
Thunk(node.f, ps...)
Expand Down
2 changes: 1 addition & 1 deletion src/array/setindex.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ function setindex(x::ArrayOp, val, idxs...)
SetIndex(x, idxs, val)
end

function stage(ctx, sidx::SetIndex)
function stage(ctx::Context, sidx::SetIndex)
inp = cached_stage(ctx, sidx.input)

dmn = domain(inp)
Expand Down
Empty file removed src/array/show.jl
Empty file.
Loading

0 comments on commit 547f2ad

Please sign in to comment.