Skip to content

Commit

Permalink
remove Domain and AbstractChunk types - move array domain stuff to da…
Browse files Browse the repository at this point in the history
…rray.jl
  • Loading branch information
Shashi Gowda committed Jun 8, 2017
1 parent 03757c3 commit 96f1e21
Show file tree
Hide file tree
Showing 12 changed files with 114 additions and 138 deletions.
5 changes: 2 additions & 3 deletions src/Dagger.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,16 @@ include("lib/logging.jl")
include("lib/dumbref.jl")

# Data and slices of data
include("thunk.jl")
include("domain.jl")
include("lib/blocked-domains.jl")
include("chunks.jl")

# Task scheduling
include("processor.jl")
include("thunk.jl")
include("compute.jl")

# Array computations
include("array/lazy-array.jl")
include("array/darray.jl")
include("array/alloc.jl")
include("array/map-reduce.jl")

Expand Down
80 changes: 76 additions & 4 deletions src/array/lazy-array.jl → src/array/darray.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,75 @@
import Base: ==
using Compat


###### Array Domains ######

if VERSION >= v"0.6.0-dev"
# TODO: Fix this better!
immutable ArrayDomain{N}
indexes::NTuple{N, Any}
end
else
immutable ArrayDomain{N}
indexes::NTuple{N}
end
end

include("../lib/domain-blocks.jl")


ArrayDomain(xs...) = ArrayDomain(xs)
ArrayDomain(xs::Array) = ArrayDomain((xs...,))

indexes(a::ArrayDomain) = a.indexes
chunks{N}(a::ArrayDomain{N}) = DomainBlocks(
ntuple(i->first(indexes(a)[i]), Val{N}), map(x->[length(x)], indexes(a)))

(==)(a::ArrayDomain, b::ArrayDomain) = indexes(a) == indexes(b)
Base.getindex(arr::AbstractArray, d::ArrayDomain) = arr[indexes(d)...]

function intersect(a::ArrayDomain, b::ArrayDomain)
if a === b
return a
end
ArrayDomain(map((x, y) -> _intersect(x, y), indexes(a), indexes(b)))
end

function project(a::ArrayDomain, b::ArrayDomain)
map(indexes(a), indexes(b)) do p, q
q - (first(p) - 1)
end |> ArrayDomain
end

function getindex(a::ArrayDomain, b::ArrayDomain)
ArrayDomain(map(getindex, indexes(a), indexes(b)))
end

"""
alignfirst(a)
Make a subdomain a standalone domain. For example,
alignfirst(ArrayDomain(11:25, 21:100))
# => ArrayDomain((1:15), (1:80))
"""
alignfirst(a::ArrayDomain) =
ArrayDomain(map(r->1:length(r), indexes(a)))

function size(a::ArrayDomain, dim)
idxs = indexes(a)
length(idxs) < dim ? 1 : length(idxs[dim])
end
size(a::ArrayDomain) = map(length, indexes(a))
length(a::ArrayDomain) = prod(size(a))
ndims(a::ArrayDomain) = length(size(a))
isempty(a::ArrayDomain) = length(a) == 0


"The domain of an array is a ArrayDomain"
domain(x::AbstractArray) = ArrayDomain([1:l for l in size(x)])


@compat abstract type ArrayOp{T, N} <: AbstractArray{T, N} end
@compat Base.IndexStyle(::Type{<:ArrayOp}) = IndexCartesian()

Expand All @@ -22,8 +91,8 @@ end

type DArray{T,N} <: ArrayOp{T, N}
domain::ArrayDomain{N}
subdomains::AbstractArray
chunks::AbstractArray
subdomains::AbstractArray{ArrayDomain{N}, N}
chunks::AbstractArray{Union{Chunk,Thunk}, N}
end

domain(d::DArray) = d.domain
Expand Down Expand Up @@ -146,7 +215,7 @@ _cumsum(x::AbstractArray) = length(x) == 0 ? Int[] : cumsum(x)
function lookup_parts{N}(ps::AbstractArray, subdmns::DomainBlocks{N}, d::ArrayDomain{N})
groups = map(group_indices, subdmns.cumlength, indexes(d))
sz = map(length, groups)
pieces = Array{AbstractChunk}(sz)
pieces = Array{Union{Chunk,Thunk}}(sz)
for i = CartesianRange(sz)
idx_and_dmn = map(getindex, groups, i.I)
idx = map(x->x[1], idx_and_dmn)
Expand Down Expand Up @@ -187,7 +256,7 @@ function thunkize(ctx, c::DArray; persist=true)
Thunk(thunks...; meta=true) do results...
t = eltype(results[1])
DArray{t, ndims(dmn)}(dmn, dmnchunks,
reshape(AbstractChunk[results...], sz))
reshape(Union{Chunk,Thunk}[results...], sz))
end
else
c
Expand Down Expand Up @@ -218,3 +287,6 @@ function cached_stage(ctx, x)
cache[x] = stage(ctx, x)
end
end

Base.@deprecate_binding Cat DArray
Base.@deprecate_binding ComputedArray DArray
12 changes: 6 additions & 6 deletions src/array/matrix.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ function size(x::Transpose)
end

ctranspose(x::ArrayOp) = Transpose(ctranspose, x)
ctranspose(x::AbstractChunk) = Thunk(ctranspose, x)
ctranspose(x::Union{Chunk, Thunk}) = Thunk(ctranspose, x)

transpose(x::ArrayOp) = Transpose(transpose, x)
transpose(x::AbstractChunk) = Thunk(transpose, x)
transpose(x::Union{Chunk, Thunk}) = Thunk(transpose, x)

function ctranspose(x::ArrayDomain{2})
d = indexes(x)
Expand All @@ -48,13 +48,13 @@ export Distribute

immutable Distribute{N, T} <: ArrayOp{N, T}
domainchunks
data::AbstractChunk
data::Union{Chunk, Thunk}
end

Distribute(dmn, data) =
Distribute(dmn, persist!(tochunk(data)))

Distribute(d, p::AbstractChunk) =
Distribute(d, p::Union{Chunk, Thunk}) =
Distribute{eltype(chunktype(p)), ndims(d)}(d, p)

size(x::Distribute) = size(domain(x.data))
Expand Down Expand Up @@ -121,8 +121,8 @@ function (+)(a::ArrayDomain, b::ArrayDomain)
a
end

(*)(a::AbstractChunk, b::AbstractChunk) = Thunk(*, a,b)
(+)(a::AbstractChunk, b::AbstractChunk) = Thunk(+, a,b)
(*)(a::Union{Chunk, Thunk}, b::Union{Chunk, Thunk}) = Thunk(*, a,b)
(+)(a::Union{Chunk, Thunk}, b::Union{Chunk, Thunk}) = Thunk(+, a,b)

# we define our own matmat and matvec multiply
# for computing the new domains and thunks.
Expand Down
2 changes: 1 addition & 1 deletion src/array/setindex.jl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ function stage(ctx, sidx::SetIndex)

groups = map(group_indices, subdmns.cumlength, indexes(d))
sz = map(length, groups)
pieces = Array{AbstractChunk}(sz)
pieces = Array{Union{Chunk, Thunk}}(sz)
for i = CartesianRange(sz)
idx_and_dmn = map(getindex, groups, i.I)
idx = map(x->x[1], idx_and_dmn)
Expand Down
35 changes: 20 additions & 15 deletions src/chunks.jl
Original file line number Diff line number Diff line change
@@ -1,32 +1,37 @@

export chunk, gather

"""
A Chunk is a recipe to read an object. The Chunk type holds information about the
domain spanned by the chunk on its own and metadata to read the chunk from its
memory / storage / network location.
export domain, UnitDomain, project, alignfirst, ArrayDomain

`gather(reader, handle)` will bring the data to memory on the caller
"""
@compat abstract type AbstractChunk end
import Base: isempty, getindex, intersect,
==, size, length, ndims

chunks(x::AbstractChunk) = x
affinity(::AbstractChunk) = []
"""
domain(x::T)
Returns metadata about `x`. This metadata will be in the `domain`
field of a Chunk object when an object of type `T` is created as
the result of evaluating a Thunk.
"""
gather(context, chunk::AbstractChunk)
function domain end

Get the data stored in a chunk
"""
function gather end
Default domain -- has no information about the value
"""
immutable UnitDomain end

"""
If no `domain` method is defined on an object, then
we use the `UnitDomain` on it. A `UnitDomain` is indivisible.
"""
domain(x::Any) = UnitDomain()

###### Chunk ######

"""
A chunk with some data
"""
type Chunk{T, H} <: AbstractChunk
type Chunk{T, H}
chunktype::Type{T}
domain
handle::H
Expand Down Expand Up @@ -71,7 +76,7 @@ function tochunk(x; persist=false)
ref = make_token(x)
Chunk(typeof(x), domain(x), ref, persist)
end
tochunk(x::AbstractChunk) = x
tochunk(x::Union{Chunk, Thunk}) = x

# Check to see if the node is set to persist
# if it is foce can override it
Expand All @@ -83,7 +88,7 @@ end
free!(x; force=true,cache=false) = x # catch-all for non-chunks


Base.@deprecate_binding AbstractPart AbstractChunk
Base.@deprecate_binding AbstractPart Union{Chunk, Thunk}
Base.@deprecate_binding Part Chunk
Base.@deprecate parts(args...) chunks(args...)
Base.@deprecate part(args...) tochunk(args...)
Expand Down
2 changes: 1 addition & 1 deletion src/compute.jl
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ function start_state(deps::Dict, node_order)
end

_move(ctx, to_proc, x) = x
_move(ctx, to_proc::OSProc, x::AbstractChunk) = gather(ctx, x)
_move(ctx, to_proc::OSProc, x::Union{Chunk, Thunk}) = gather(ctx, x)

function do_task(ctx, proc, thunk_id, f, data, send_result, persist)
@dbg timespan_start(ctx, :comm, thunk_id, proc)
Expand Down
99 changes: 0 additions & 99 deletions src/domain.jl
Original file line number Diff line number Diff line change
@@ -1,99 +0,0 @@

export domain, UnitDomain, project, alignfirst, ArrayDomain

import Base: isempty, getindex, intersect,
==, size, length, ndims

"""
Default domain
"""
immutable UnitDomain end

"""
domain(x::T)
The domain is the range of indexes/keys of an object.
The domain is partitioned when you distribute an object.
`domain(x)` returns the domain of `x`.
when an object is partitioned, subsets of the domain are
assigned to different processes or files.
see also `partition`.
"""
function domain end

"""
If no `domain` method is defined on an object, then
we use the `UnitDomain` on it. A `UnitDomain` is indivisible.
"""
domain(x::Any) = UnitDomain()

isempty(::UnitDomain) = false


###### Array Domains ######

if VERSION >= v"0.6.0-dev"
# TODO: Fix this better!
immutable ArrayDomain{N}
indexes::NTuple{N, Any}
end
else
immutable ArrayDomain{N}
indexes::NTuple{N}
end
end

ArrayDomain(xs...) = ArrayDomain(xs)
ArrayDomain(xs::Array) = ArrayDomain((xs...,))

indexes(a::ArrayDomain) = a.indexes
chunks{N}(a::ArrayDomain{N}) = DomainBlocks(
ntuple(i->first(indexes(a)[i]), Val{N}), map(x->[length(x)], indexes(a)))

(==)(a::ArrayDomain, b::ArrayDomain) = indexes(a) == indexes(b)
Base.getindex(arr::AbstractArray, d::ArrayDomain) = arr[indexes(d)...]

function intersect(a::ArrayDomain, b::ArrayDomain)
if a === b
return a
end
ArrayDomain(map((x, y) -> _intersect(x, y), indexes(a), indexes(b)))
end

function project(a::ArrayDomain, b::ArrayDomain)
map(indexes(a), indexes(b)) do p, q
q - (first(p) - 1)
end |> ArrayDomain
end

function getindex(a::ArrayDomain, b::ArrayDomain)
ArrayDomain(map(getindex, indexes(a), indexes(b)))
end

"""
alignfirst(a)
Make a subdomain a standalone domain. For example,
alignfirst(ArrayDomain(11:25, 21:100))
# => ArrayDomain((1:15), (1:80))
"""
alignfirst(a::ArrayDomain) =
ArrayDomain(map(r->1:length(r), indexes(a)))

function size(a::ArrayDomain, dim)
idxs = indexes(a)
length(idxs) < dim ? 1 : length(idxs[dim])
end
size(a::ArrayDomain) = map(length, indexes(a))
length(a::ArrayDomain) = prod(size(a))
ndims(a::ArrayDomain) = length(size(a))
isempty(a::ArrayDomain) = length(a) == 0


"The domain of an array is a ArrayDomain"
domain(x::AbstractArray) = ArrayDomain([1:l for l in size(x)])

10 changes: 5 additions & 5 deletions src/file-io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ const CAT = 0x01
# subparts are saved as Parts

"""
save(ctx, chunk::AbstractChunk, file_path::AbsractString)
save(ctx, chunk::Union{Chunk, Thunk}, file_path::AbsractString)
Save a chunk to a file at `file_path`.
"""
function save(ctx, chunk::AbstractChunk, file_path::AbstractString)
function save(ctx, chunk::Union{Chunk, Thunk}, file_path::AbstractString)
open(file_path, "w") do io
save(ctx, io, chunk, file_path)
end
Expand Down Expand Up @@ -105,7 +105,7 @@ function save{X}(ctx, chunk::Chunk{X, FileReader}, file_path::AbstractString)
end
end

save(chunk::AbstractChunk, file_path::AbstractString) = save(Context(), chunk, file_path)
save(chunk::Union{Chunk, Thunk}, file_path::AbstractString) = save(Context(), chunk, file_path)



Expand All @@ -114,7 +114,7 @@ save(chunk::AbstractChunk, file_path::AbstractString) = save(Context(), chunk, f
"""
load(ctx, file_path)
Load an AbstractChunk from a file.
Load an Union{Chunk, Thunk} from a file.
"""
function load(ctx, file_path::AbstractString; mmap=false)

Expand Down Expand Up @@ -276,7 +276,7 @@ function stage(ctx, s::Save)
sz = size(chunks(x))
function save_cat_meta(chunks...)
f = open(s.name, "w")
saved_parts = reshape(AbstractChunk[c for c in chunks], sz)
saved_parts = reshape(Union{Chunk, Thunk}[c for c in chunks], sz)
res = save(ctx, f, x, s.name, saved_parts)
close(f)
res
Expand Down
File renamed without changes.

0 comments on commit 96f1e21

Please sign in to comment.